Skip to content

Commit 2f95d56

Browse files
committed
fix
1 parent 27d761f commit 2f95d56

File tree

5 files changed

+28
-22
lines changed

5 files changed

+28
-22
lines changed

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ abstract class AbstractFileStoreTable implements FileStoreTable {
8686

8787
private static final long serialVersionUID = 1L;
8888

89+
private static final String WATERMARK_PREFIX = "watermark-";
90+
8991
protected final FileIO fileIO;
9092
protected final Path path;
9193
protected final TableSchema tableSchema;
@@ -188,7 +190,7 @@ public String uuid() {
188190

189191
@Override
190192
public Optional<Statistics> statistics() {
191-
Snapshot snapshot = TimeTravelUtil.resolveSnapshot(this);
193+
Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(this);
192194
if (snapshot != null) {
193195
String file = snapshot.statistics();
194196
if (file == null) {
@@ -479,8 +481,8 @@ private Optional<TableSchema> tryTimeTravel(Options options) {
479481
Snapshot snapshot;
480482
try {
481483
snapshot =
482-
TimeTravelUtil.resolveSnapshotFromOptions(
483-
options, snapshotManager(), tagManager());
484+
TimeTravelUtil.tryTravelToSnapshot(options, snapshotManager(), tagManager())
485+
.orElse(null);
484486
} catch (Exception e) {
485487
return Optional.empty();
486488
}

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.FileNotFoundException;
3838
import java.util.ArrayList;
3939
import java.util.List;
40+
import java.util.Optional;
4041

4142
import static org.apache.paimon.utils.Preconditions.checkArgument;
4243
import static org.apache.paimon.utils.SnapshotManager.EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM;
@@ -55,14 +56,16 @@ public class TimeTravelUtil {
5556
CoreOptions.SCAN_TIMESTAMP_MILLIS.key()
5657
};
5758

58-
@Nullable
59-
public static Snapshot resolveSnapshot(FileStoreTable table) {
60-
return resolveSnapshotFromOptions(
59+
public static Snapshot tryTravelOrLatest(FileStoreTable table) {
60+
return tryTravelToSnapshot(table).orElseGet(() -> table.latestSnapshot().orElse(null));
61+
}
62+
63+
public static Optional<Snapshot> tryTravelToSnapshot(FileStoreTable table) {
64+
return tryTravelToSnapshot(
6165
table.coreOptions().toConfiguration(), table.snapshotManager(), table.tagManager());
6266
}
6367

64-
@Nullable
65-
public static Snapshot resolveSnapshotFromOptions(
68+
public static Optional<Snapshot> tryTravelToSnapshot(
6669
Options options, SnapshotManager snapshotManager, TagManager tagManager) {
6770
adaptScanVersion(options, tagManager);
6871

@@ -74,7 +77,7 @@ public static Snapshot resolveSnapshotFromOptions(
7477
}
7578

7679
if (scanHandleKey.isEmpty()) {
77-
return snapshotManager.latestSnapshot();
80+
return Optional.empty();
7881
}
7982

8083
checkArgument(
@@ -87,8 +90,8 @@ public static Snapshot resolveSnapshotFromOptions(
8790
CoreOptions.SCAN_TIMESTAMP_MILLIS.key()));
8891

8992
String key = scanHandleKey.get(0);
90-
Snapshot snapshot = null;
9193
CoreOptions coreOptions = new CoreOptions(options);
94+
Snapshot snapshot;
9295
if (key.equals(CoreOptions.SCAN_SNAPSHOT_ID.key())) {
9396
snapshot =
9497
new StaticFromSnapshotStartingScanner(
@@ -108,9 +111,10 @@ public static Snapshot resolveSnapshotFromOptions(
108111
snapshot =
109112
new StaticFromTagStartingScanner(snapshotManager, coreOptions.scanTagName())
110113
.getSnapshot();
114+
} else {
115+
throw new UnsupportedOperationException("Unsupported time travel mode: " + key);
111116
}
112-
113-
return snapshot;
117+
return Optional.of(snapshot);
114118
}
115119

116120
private static void adaptScanVersion(Options options, TagManager tagManager) {

paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ private InternalRow toRow(
229229

230230
private static List<ManifestFileMeta> allManifests(FileStoreTable dataTable) {
231231
CoreOptions options = dataTable.coreOptions();
232-
Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable);
232+
Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(dataTable);
233233
if (snapshot == null) {
234234
LOG.warn("Check if your snapshot is empty.");
235235
return Collections.emptyList();

paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ private InternalRow toRow(
230230

231231
private static List<IndexManifestEntry> allIndexEntries(FileStoreTable dataTable) {
232232
IndexFileHandler indexFileHandler = dataTable.store().newIndexFileHandler();
233-
Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable);
233+
Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(dataTable);
234234
if (snapshot == null) {
235235
LOG.warn("Check if your snapshot is empty.");
236236
return Collections.emptyList();

paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
public class TimeTravelUtilsTest extends ScannerTestBase {
3838

3939
@Test
40-
public void testResolveSnapshotFromOptions() throws Exception {
40+
public void testtryTravelToSnapshot() throws Exception {
4141
SnapshotManager snapshotManager = table.snapshotManager();
4242
StreamTableWrite write = table.newWrite(commitUser);
4343
StreamTableCommit commit = table.newCommit(commitUser);
@@ -57,26 +57,26 @@ public void testResolveSnapshotFromOptions() throws Exception {
5757
optMap.put("scan.snapshot-id", "2");
5858
CoreOptions options = CoreOptions.fromMap(optMap);
5959
Snapshot snapshot =
60-
TimeTravelUtil.resolveSnapshotFromOptions(
61-
options.toConfiguration(), snapshotManager, null);
60+
TimeTravelUtil.tryTravelToSnapshot(options.toConfiguration(), snapshotManager, null)
61+
.orElse(null);
6262
assertNotNull(snapshot);
6363
assertTrue(snapshot.id() == 2);
6464

6565
optMap.clear();
6666
optMap.put("scan.timestamp-millis", ts + "");
6767
options = CoreOptions.fromMap(optMap);
6868
snapshot =
69-
TimeTravelUtil.resolveSnapshotFromOptions(
70-
options.toConfiguration(), snapshotManager, null);
69+
TimeTravelUtil.tryTravelToSnapshot(options.toConfiguration(), snapshotManager, null)
70+
.orElse(null);
7171
assertTrue(snapshot.id() == 1);
7272

7373
table.createTag("tag3", 3);
7474
optMap.clear();
7575
optMap.put("scan.tag-name", "tag3");
7676
options = CoreOptions.fromMap(optMap);
7777
snapshot =
78-
TimeTravelUtil.resolveSnapshotFromOptions(
79-
options.toConfiguration(), snapshotManager, null);
78+
TimeTravelUtil.tryTravelToSnapshot(options.toConfiguration(), snapshotManager, null)
79+
.orElse(null);
8080
assertTrue(snapshot.id() == 3);
8181

8282
// if contain more scan.xxx config would throw out
@@ -85,7 +85,7 @@ public void testResolveSnapshotFromOptions() throws Exception {
8585
assertThrows(
8686
IllegalArgumentException.class,
8787
() ->
88-
TimeTravelUtil.resolveSnapshotFromOptions(
88+
TimeTravelUtil.tryTravelToSnapshot(
8989
options1.toConfiguration(), snapshotManager, null),
9090
"scan.snapshot-id scan.tag-name scan.watermark and scan.timestamp-millis can contains only one");
9191

0 commit comments

Comments
 (0)