Skip to content

Commit 5801977

Browse files
committed
[core] Support rollback to tag from snapshot id
1 parent a88470a commit 5801977

File tree

2 files changed

+40
-5
lines changed

2 files changed

+40
-5
lines changed

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,6 @@ abstract class AbstractFileStoreTable implements FileStoreTable {
8585

8686
private static final long serialVersionUID = 1L;
8787

88-
private static final String WATERMARK_PREFIX = "watermark-";
89-
9088
protected final FileIO fileIO;
9189
protected final Path path;
9290
protected final TableSchema tableSchema;
@@ -502,14 +500,22 @@ public void rollbackTo(long snapshotId) {
502500
try {
503501
snapshotManager.rollback(Instant.snapshot(snapshotId));
504502
} catch (UnsupportedOperationException e) {
505-
Snapshot snapshot;
506503
try {
507-
snapshot = snapshotManager.tryGetSnapshot(snapshotId);
504+
Snapshot snapshot = snapshotManager.tryGetSnapshot(snapshotId);
505+
rollbackHelper().cleanLargerThan(snapshot);
508506
} catch (FileNotFoundException ex) {
507+
// try to get snapshot from tag
508+
TagManager tagManager = tagManager();
509+
SortedMap<Snapshot, List<String>> tags = tagManager.tags();
510+
for (Map.Entry<Snapshot, List<String>> entry : tags.entrySet()) {
511+
if (entry.getKey().id() == snapshotId) {
512+
rollbackTo(entry.getValue().get(0));
513+
return;
514+
}
515+
}
509516
throw new IllegalArgumentException(
510517
String.format("Rollback snapshot '%s' doesn't exist.", snapshotId), ex);
511518
}
512-
rollbackHelper().cleanLargerThan(snapshot);
513519
}
514520
}
515521

paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,35 @@ public void testRollbackToTag(boolean expire) throws Exception {
931931
assertRollbackTo(table, singletonList(1L), 1, 1, singletonList("test1"));
932932
}
933933

934+
@Test
935+
public void testRollbackToTagFromSnapshotId() throws Exception {
936+
int commitTimes = ThreadLocalRandom.current().nextInt(100) + 5;
937+
FileStoreTable table = prepareRollbackTable(commitTimes);
938+
939+
table.createTag("test", 1);
940+
941+
// expire snapshots
942+
Options options = new Options();
943+
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 5);
944+
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 5);
945+
options.set(SNAPSHOT_EXPIRE_LIMIT, Integer.MAX_VALUE);
946+
options.set(CHANGELOG_NUM_RETAINED_MIN, 5);
947+
options.set(CHANGELOG_NUM_RETAINED_MAX, 5);
948+
table.copy(options.toMap()).newCommit("").expireSnapshots();
949+
950+
table.rollbackTo(1);
951+
ReadBuilder readBuilder = table.newReadBuilder();
952+
List<String> result =
953+
getResult(
954+
readBuilder.newRead(),
955+
readBuilder.newScan().plan().splits(),
956+
BATCH_ROW_TO_STRING);
957+
assertThat(result)
958+
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
959+
960+
assertRollbackTo(table, singletonList(1L), 1, 1, singletonList("test"));
961+
}
962+
934963
private FileStoreTable prepareRollbackTable(int commitTimes) throws Exception {
935964
FileStoreTable table = createFileStoreTable();
936965
return prepareRollbackTable(commitTimes, table);

0 commit comments

Comments
 (0)