Skip to content

Commit 27d761f

Browse files
committed
fix
1 parent 5912639 commit 27d761f

File tree

3 files changed

+50
-13
lines changed

3 files changed

+50
-13
lines changed

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -476,10 +476,11 @@ protected Runnable newExpireRunnable() {
476476
}
477477

478478
private Optional<TableSchema> tryTimeTravel(Options options) {
479-
CoreOptions coreOptions = new CoreOptions(options);
480479
Snapshot snapshot;
481480
try {
482-
snapshot = TimeTravelUtil.resolveSnapshotFromOptions(coreOptions, snapshotManager());
481+
snapshot =
482+
TimeTravelUtil.resolveSnapshotFromOptions(
483+
options, snapshotManager(), tagManager());
483484
} catch (Exception e) {
484485
return Optional.empty();
485486
}

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020

2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.Snapshot;
23+
import org.apache.paimon.options.Options;
2324
import org.apache.paimon.schema.SchemaManager;
2425
import org.apache.paimon.schema.TableSchema;
2526
import org.apache.paimon.table.FileStoreTable;
2627
import org.apache.paimon.utils.ChangelogManager;
2728
import org.apache.paimon.utils.FunctionWithException;
2829
import org.apache.paimon.utils.SnapshotManager;
30+
import org.apache.paimon.utils.TagManager;
2931

3032
import org.slf4j.Logger;
3133
import org.slf4j.LoggerFactory;
@@ -44,6 +46,8 @@ public class TimeTravelUtil {
4446

4547
private static final Logger LOG = LoggerFactory.getLogger(TimeTravelUtil.class);
4648

49+
private static final String WATERMARK_PREFIX = "watermark-";
50+
4751
private static final String[] SCAN_KEYS = {
4852
CoreOptions.SCAN_SNAPSHOT_ID.key(),
4953
CoreOptions.SCAN_TAG_NAME.key(),
@@ -53,15 +57,18 @@ public class TimeTravelUtil {
5357

5458
@Nullable
5559
public static Snapshot resolveSnapshot(FileStoreTable table) {
56-
return resolveSnapshotFromOptions(table.coreOptions(), table.snapshotManager());
60+
return resolveSnapshotFromOptions(
61+
table.coreOptions().toConfiguration(), table.snapshotManager(), table.tagManager());
5762
}
5863

5964
@Nullable
6065
public static Snapshot resolveSnapshotFromOptions(
61-
CoreOptions options, SnapshotManager snapshotManager) {
66+
Options options, SnapshotManager snapshotManager, TagManager tagManager) {
67+
adaptScanVersion(options, tagManager);
68+
6269
List<String> scanHandleKey = new ArrayList<>(1);
6370
for (String key : SCAN_KEYS) {
64-
if (options.toConfiguration().containsKey(key)) {
71+
if (options.containsKey(key)) {
6572
scanHandleKey.add(key);
6673
}
6774
}
@@ -81,28 +88,49 @@ public static Snapshot resolveSnapshotFromOptions(
8188

8289
String key = scanHandleKey.get(0);
8390
Snapshot snapshot = null;
91+
CoreOptions coreOptions = new CoreOptions(options);
8492
if (key.equals(CoreOptions.SCAN_SNAPSHOT_ID.key())) {
8593
snapshot =
86-
new StaticFromSnapshotStartingScanner(snapshotManager, options.scanSnapshotId())
94+
new StaticFromSnapshotStartingScanner(
95+
snapshotManager, coreOptions.scanSnapshotId())
8796
.getSnapshot();
8897
} else if (key.equals(CoreOptions.SCAN_WATERMARK.key())) {
8998
snapshot =
90-
new StaticFromWatermarkStartingScanner(snapshotManager, options.scanWatermark())
99+
new StaticFromWatermarkStartingScanner(
100+
snapshotManager, coreOptions.scanWatermark())
91101
.getSnapshot();
92102
} else if (key.equals(CoreOptions.SCAN_TIMESTAMP_MILLIS.key())) {
93103
snapshot =
94104
new StaticFromTimestampStartingScanner(
95-
snapshotManager, options.scanTimestampMills())
105+
snapshotManager, coreOptions.scanTimestampMills())
96106
.getSnapshot();
97107
} else if (key.equals(CoreOptions.SCAN_TAG_NAME.key())) {
98108
snapshot =
99-
new StaticFromTagStartingScanner(snapshotManager, options.scanTagName())
109+
new StaticFromTagStartingScanner(snapshotManager, coreOptions.scanTagName())
100110
.getSnapshot();
101111
}
102112

103113
return snapshot;
104114
}
105115

116+
private static void adaptScanVersion(Options options, TagManager tagManager) {
117+
String version = options.remove(CoreOptions.SCAN_VERSION.key());
118+
if (version == null) {
119+
return;
120+
}
121+
122+
if (tagManager.tagExists(version)) {
123+
options.set(CoreOptions.SCAN_TAG_NAME, version);
124+
} else if (version.startsWith(WATERMARK_PREFIX)) {
125+
long watermark = Long.parseLong(version.substring(WATERMARK_PREFIX.length()));
126+
options.set(CoreOptions.SCAN_WATERMARK, watermark);
127+
} else if (version.chars().allMatch(Character::isDigit)) {
128+
options.set(CoreOptions.SCAN_SNAPSHOT_ID.key(), version);
129+
} else {
130+
throw new RuntimeException("Cannot find a time travel version for " + version);
131+
}
132+
}
133+
106134
/**
107135
* Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be
108136
* returned if all snapshots are equal to or later than the timestamp mills.

paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/TimeTravelUtilsTest.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,29 +56,37 @@ public void testResolveSnapshotFromOptions() throws Exception {
5656
HashMap<String, String> optMap = new HashMap<>(4);
5757
optMap.put("scan.snapshot-id", "2");
5858
CoreOptions options = CoreOptions.fromMap(optMap);
59-
Snapshot snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, snapshotManager);
59+
Snapshot snapshot =
60+
TimeTravelUtil.resolveSnapshotFromOptions(
61+
options.toConfiguration(), snapshotManager, null);
6062
assertNotNull(snapshot);
6163
assertTrue(snapshot.id() == 2);
6264

6365
optMap.clear();
6466
optMap.put("scan.timestamp-millis", ts + "");
6567
options = CoreOptions.fromMap(optMap);
66-
snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, snapshotManager);
68+
snapshot =
69+
TimeTravelUtil.resolveSnapshotFromOptions(
70+
options.toConfiguration(), snapshotManager, null);
6771
assertTrue(snapshot.id() == 1);
6872

6973
table.createTag("tag3", 3);
7074
optMap.clear();
7175
optMap.put("scan.tag-name", "tag3");
7276
options = CoreOptions.fromMap(optMap);
73-
snapshot = TimeTravelUtil.resolveSnapshotFromOptions(options, snapshotManager);
77+
snapshot =
78+
TimeTravelUtil.resolveSnapshotFromOptions(
79+
options.toConfiguration(), snapshotManager, null);
7480
assertTrue(snapshot.id() == 3);
7581

7682
// if contain more scan.xxx config would throw out
7783
optMap.put("scan.snapshot-id", "2");
7884
CoreOptions options1 = CoreOptions.fromMap(optMap);
7985
assertThrows(
8086
IllegalArgumentException.class,
81-
() -> TimeTravelUtil.resolveSnapshotFromOptions(options1, snapshotManager),
87+
() ->
88+
TimeTravelUtil.resolveSnapshotFromOptions(
89+
options1.toConfiguration(), snapshotManager, null),
8290
"scan.snapshot-id scan.tag-name scan.watermark and scan.timestamp-millis can contains only one");
8391

8492
assertThat(

0 commit comments

Comments
 (0)