Skip to content

Commit 8d7c13b

Browse files
committed
[core] Simplify rollback to only clean snapshots
1 parent bc25e88 commit 8d7c13b

File tree

7 files changed

+96
-222
lines changed

7 files changed

+96
-222
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public void cleanUnusedManifests(Changelog changelog, Set<String> skippingSet) {
8787
// the index and statics manifest list should handle by snapshot deletion.
8888
}
8989

90+
@Override
9091
public Set<String> manifestSkippingSet(List<Snapshot> skippingSnapshots) {
9192
Set<String> skippingSet = new HashSet<>();
9293

paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737

3838
import java.io.IOException;
3939
import java.util.Collection;
40-
import java.util.Collections;
4140
import java.util.HashMap;
4241
import java.util.HashSet;
4342
import java.util.List;
@@ -106,10 +105,6 @@ public void cleanUnusedManifests(Snapshot taggedSnapshot, Set<String> skippingSe
106105
cleanUnusedManifests(taggedSnapshot, skippingSet, true, false);
107106
}
108107

109-
public Predicate<ExpireFileEntry> dataFileSkipper(Snapshot fromSnapshot) throws Exception {
110-
return dataFileSkipper(Collections.singletonList(fromSnapshot));
111-
}
112-
113108
public Predicate<ExpireFileEntry> dataFileSkipper(List<Snapshot> fromSnapshots)
114109
throws Exception {
115110
Map<BinaryRow, Map<Integer, Set<String>>> skipped = new HashMap<>();

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 23 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@
6868

6969
import javax.annotation.Nullable;
7070

71-
import java.io.IOException;
72-
import java.io.UncheckedIOException;
71+
import java.io.FileNotFoundException;
7372
import java.time.Duration;
7473
import java.util.HashMap;
7574
import java.util.List;
@@ -80,7 +79,6 @@
8079
import java.util.function.BiConsumer;
8180

8281
import static org.apache.paimon.CoreOptions.PATH;
83-
import static org.apache.paimon.utils.Preconditions.checkArgument;
8482

8583
/** Abstract {@link FileStoreTable}. */
8684
abstract class AbstractFileStoreTable implements FileStoreTable {
@@ -504,11 +502,27 @@ public void rollbackTo(long snapshotId) {
504502
try {
505503
snapshotManager.rollback(Instant.snapshot(snapshotId));
506504
} catch (UnsupportedOperationException e) {
507-
checkArgument(
508-
snapshotManager.snapshotExists(snapshotId),
509-
"Rollback snapshot '%s' doesn't exist.",
510-
snapshotId);
511-
rollbackHelper().updateLatestAndCleanLargerThan(snapshotManager.snapshot(snapshotId));
505+
Snapshot snapshot;
506+
try {
507+
snapshot = snapshotManager.tryGetSnapshot(snapshotId);
508+
} catch (FileNotFoundException ex) {
509+
throw new IllegalArgumentException(
510+
String.format("Rollback snapshot '%s' doesn't exist.", snapshotId), ex);
511+
}
512+
rollbackHelper().cleanLargerThan(snapshot);
513+
}
514+
}
515+
516+
@Override
517+
public void rollbackTo(String tagName) {
518+
SnapshotManager snapshotManager = snapshotManager();
519+
try {
520+
snapshotManager.rollback(Instant.tag(tagName));
521+
} catch (UnsupportedOperationException e) {
522+
Snapshot taggedSnapshot = tagManager().getOrThrow(tagName).trimToSnapshot();
523+
RollbackHelper rollbackHelper = rollbackHelper();
524+
rollbackHelper.cleanLargerThan(taggedSnapshot);
525+
rollbackHelper.createSnapshotFileIfNeeded(taggedSnapshot);
512526
}
513527
}
514528

@@ -644,38 +658,6 @@ public void fastForward(String branchName) {
644658
branchManager().fastForward(branchName);
645659
}
646660

647-
@Override
648-
public void rollbackTo(String tagName) {
649-
SnapshotManager snapshotManager = snapshotManager();
650-
try {
651-
snapshotManager.rollback(Instant.tag(tagName));
652-
return;
653-
} catch (UnsupportedOperationException ignore) {
654-
655-
}
656-
TagManager tagManager = tagManager();
657-
checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' doesn't exist.", tagName);
658-
659-
Snapshot taggedSnapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
660-
rollbackHelper().updateLatestAndCleanLargerThan(taggedSnapshot);
661-
662-
try {
663-
// it is possible that the earliest snapshot is later than the rollback tag because of
664-
// snapshot expiration, in this case the `cleanLargerThan` method will delete all
665-
// snapshots, so we should write the tag file to snapshot directory and modify the
666-
// earliest hint
667-
if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
668-
fileIO.writeFile(
669-
snapshotManager().snapshotPath(taggedSnapshot.id()),
670-
fileIO.readFileUtf8(tagManager.tagPath(tagName)),
671-
false);
672-
snapshotManager.commitEarliestHint(taggedSnapshot.id());
673-
}
674-
} catch (IOException e) {
675-
throw new UncheckedIOException(e);
676-
}
677-
}
678-
679661
@Override
680662
public TagManager tagManager() {
681663
return new TagManager(fileIO, path, currentBranch());
@@ -713,14 +695,7 @@ public FileStoreTable switchToBranch(String branchName) {
713695
}
714696

715697
private RollbackHelper rollbackHelper() {
716-
return new RollbackHelper(
717-
snapshotManager(),
718-
changelogManager(),
719-
tagManager(),
720-
fileIO,
721-
store().newSnapshotDeletion(),
722-
store().newChangelogDeletion(),
723-
store().newTagDeletion());
698+
return new RollbackHelper(snapshotManager(), changelogManager(), tagManager(), fileIO);
724699
}
725700

726701
protected RowKindGenerator rowKindGenerator() {

paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java

Lines changed: 37 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -21,135 +21,91 @@
2121
import org.apache.paimon.Changelog;
2222
import org.apache.paimon.Snapshot;
2323
import org.apache.paimon.fs.FileIO;
24-
import org.apache.paimon.manifest.ExpireFileEntry;
25-
import org.apache.paimon.operation.ChangelogDeletion;
26-
import org.apache.paimon.operation.SnapshotDeletion;
27-
import org.apache.paimon.operation.TagDeletion;
2824
import org.apache.paimon.utils.ChangelogManager;
2925
import org.apache.paimon.utils.SnapshotManager;
3026
import org.apache.paimon.utils.TagManager;
3127

32-
import org.slf4j.Logger;
33-
import org.slf4j.LoggerFactory;
34-
3528
import java.io.IOException;
3629
import java.io.UncheckedIOException;
3730
import java.util.ArrayList;
38-
import java.util.Collections;
39-
import java.util.HashSet;
4031
import java.util.List;
41-
import java.util.Set;
4232
import java.util.SortedMap;
43-
import java.util.function.Predicate;
4433

4534
import static org.apache.paimon.utils.Preconditions.checkNotNull;
4635

4736
/** Helper class for {@link Table#rollbackTo} including utils to clean snapshots. */
4837
public class RollbackHelper {
4938

50-
private static final Logger LOG = LoggerFactory.getLogger(RollbackHelper.class);
51-
5239
private final SnapshotManager snapshotManager;
5340
private final ChangelogManager changelogManager;
5441
private final TagManager tagManager;
5542
private final FileIO fileIO;
56-
private final SnapshotDeletion snapshotDeletion;
57-
private final ChangelogDeletion changelogDeletion;
58-
private final TagDeletion tagDeletion;
5943

6044
public RollbackHelper(
6145
SnapshotManager snapshotManager,
6246
ChangelogManager changelogManager,
6347
TagManager tagManager,
64-
FileIO fileIO,
65-
SnapshotDeletion snapshotDeletion,
66-
ChangelogDeletion changelogDeletion,
67-
TagDeletion tagDeletion) {
48+
FileIO fileIO) {
6849
this.snapshotManager = snapshotManager;
6950
this.changelogManager = changelogManager;
7051
this.tagManager = tagManager;
7152
this.fileIO = fileIO;
72-
this.snapshotDeletion = snapshotDeletion;
73-
this.changelogDeletion = changelogDeletion;
74-
this.tagDeletion = tagDeletion;
7553
}
7654

7755
/** Clean snapshots and tags whose id is larger than given snapshot's and update latest hit. */
78-
public void updateLatestAndCleanLargerThan(Snapshot retainedSnapshot) {
79-
// clean data files
80-
List<Snapshot> cleanedSnapshots = updateLatestAndCleanSnapshotsDataFiles(retainedSnapshot);
81-
List<Changelog> cleanedChangelogs = cleanLongLivedChangelogDataFiles(retainedSnapshot);
82-
List<Snapshot> cleanedTags = cleanTagsDataFiles(retainedSnapshot);
83-
Set<Long> cleanedIds = new HashSet<>();
84-
85-
// clean manifests
86-
// this can be used for snapshots and tags manifests cleaning both
87-
Set<String> manifestsSkippingSet = snapshotDeletion.manifestSkippingSet(retainedSnapshot);
88-
89-
for (Snapshot snapshot : cleanedSnapshots) {
90-
snapshotDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
91-
cleanedIds.add(snapshot.id());
92-
}
93-
94-
for (Changelog changelog : cleanedChangelogs) {
95-
changelogDeletion.cleanUnusedManifests(changelog, manifestsSkippingSet);
96-
cleanedIds.add(changelog.id());
56+
public void cleanLargerThan(Snapshot retainedSnapshot) {
57+
try {
58+
cleanSnapshots(retainedSnapshot);
59+
cleanLongLivedChangelogs(retainedSnapshot);
60+
cleanTags(retainedSnapshot);
61+
} catch (IOException e) {
62+
throw new RuntimeException(e);
9763
}
64+
}
9865

99-
for (Snapshot snapshot : cleanedTags) {
100-
if (cleanedIds.contains(snapshot.id())) {
101-
continue;
66+
public void createSnapshotFileIfNeeded(Snapshot taggedSnapshot) {
67+
// it is possible that the earliest snapshot is later than the rollback tag because of
68+
// snapshot expiration, in this case the `cleanLargerThan` method will delete all
69+
// snapshots, so we should write the tag file to snapshot directory and modify the
70+
// earliest hint
71+
if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
72+
try {
73+
fileIO.writeFile(
74+
snapshotManager.snapshotPath(taggedSnapshot.id()),
75+
taggedSnapshot.toJson(),
76+
false);
77+
snapshotManager.commitEarliestHint(taggedSnapshot.id());
78+
} catch (IOException e) {
79+
throw new UncheckedIOException(e);
10280
}
103-
tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
10481
}
10582
}
10683

107-
private List<Snapshot> updateLatestAndCleanSnapshotsDataFiles(Snapshot retainedSnapshot) {
84+
private void cleanSnapshots(Snapshot retainedSnapshot) throws IOException {
10885
long earliest =
10986
checkNotNull(
11087
snapshotManager.earliestSnapshotId(), "Cannot find earliest snapshot.");
11188
long latest =
11289
checkNotNull(snapshotManager.latestSnapshotId(), "Cannot find latest snapshot.");
11390

11491
// modify the latest hint
115-
try {
116-
snapshotManager.commitLatestHint(retainedSnapshot.id());
117-
} catch (IOException e) {
118-
throw new UncheckedIOException(e);
119-
}
120-
// delete snapshot files first, cannot be read now
92+
snapshotManager.commitLatestHint(retainedSnapshot.id());
93+
12194
// it is possible that some snapshots have been expired
122-
List<Snapshot> toBeCleaned = new ArrayList<>();
12395
long to = Math.max(earliest, retainedSnapshot.id() + 1);
12496
for (long i = latest; i >= to; i--) {
12597
// Ignore the non-existent snapshots
12698
if (snapshotManager.snapshotExists(i)) {
127-
toBeCleaned.add(snapshotManager.snapshot(i));
12899
snapshotManager.deleteSnapshot(i);
129100
}
130101
}
131-
132-
// delete data files of snapshots
133-
// don't worry about tag data files because file deletion methods won't throw exception
134-
// when deleting non-existing data files
135-
for (Snapshot snapshot : toBeCleaned) {
136-
snapshotDeletion.deleteAddedDataFiles(snapshot.deltaManifestList());
137-
if (snapshot.changelogManifestList() != null) {
138-
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
139-
}
140-
}
141-
142-
// delete directories
143-
snapshotDeletion.cleanEmptyDirectories();
144-
145-
return toBeCleaned;
146102
}
147103

148-
private List<Changelog> cleanLongLivedChangelogDataFiles(Snapshot retainedSnapshot) {
104+
private void cleanLongLivedChangelogs(Snapshot retainedSnapshot) throws IOException {
149105
Long earliest = changelogManager.earliestLongLivedChangelogId();
150106
Long latest = changelogManager.latestLongLivedChangelogId();
151107
if (earliest == null || latest == null) {
152-
return Collections.emptyList();
108+
return;
153109
}
154110

155111
// it is possible that some snapshots have been expired
@@ -160,73 +116,37 @@ private List<Changelog> cleanLongLivedChangelogDataFiles(Snapshot retainedSnapsh
160116
}
161117

162118
// modify the latest hint
163-
try {
164-
if (toBeCleaned.size() > 0) {
165-
if (to == earliest) {
166-
// all changelog has been cleaned, so we do not know the actual latest id
167-
// set to -1
168-
changelogManager.commitLongLivedChangelogLatestHint(-1);
169-
} else {
170-
changelogManager.commitLongLivedChangelogLatestHint(to - 1);
171-
}
119+
if (!toBeCleaned.isEmpty()) {
120+
if (to == earliest) {
121+
// all changelog has been cleaned, so we do not know the actual latest id
122+
// set to -1
123+
changelogManager.commitLongLivedChangelogLatestHint(-1);
124+
} else {
125+
changelogManager.commitLongLivedChangelogLatestHint(to - 1);
172126
}
173-
} catch (IOException e) {
174-
throw new UncheckedIOException(e);
175127
}
176128

177129
// delete data files of changelog
178130
for (Changelog changelog : toBeCleaned) {
179-
// delete changelog files first, cannot be read now
180131
fileIO.deleteQuietly(changelogManager.longLivedChangelogPath(changelog.id()));
181-
// clean the deleted file
182-
changelogDeletion.cleanUnusedDataFiles(changelog, manifestEntry -> false);
183132
}
184-
185-
// delete directories
186-
snapshotDeletion.cleanEmptyDirectories();
187-
188-
return toBeCleaned;
189133
}
190134

191-
private List<Snapshot> cleanTagsDataFiles(Snapshot retainedSnapshot) {
135+
private void cleanTags(Snapshot retainedSnapshot) {
192136
SortedMap<Snapshot, List<String>> tags = tagManager.tags();
193137
if (tags.isEmpty()) {
194-
return Collections.emptyList();
138+
return;
195139
}
196140

197141
List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
198-
List<Snapshot> toBeCleaned = new ArrayList<>();
199142

200143
// delete tag files
201144
for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
202145
Snapshot tag = taggedSnapshots.get(i);
203146
if (tag.id() <= retainedSnapshot.id()) {
204147
break;
205148
}
206-
toBeCleaned.add(tag);
207149
tags.get(tag).forEach(tagName -> fileIO.deleteQuietly(tagManager.tagPath(tagName)));
208150
}
209-
210-
// delete data files
211-
Predicate<ExpireFileEntry> dataFileSkipper = null;
212-
boolean success = true;
213-
try {
214-
dataFileSkipper = tagDeletion.dataFileSkipper(retainedSnapshot);
215-
} catch (Exception e) {
216-
LOG.info(
217-
"Skip cleaning data files for deleted tags due to failed to build skipping set.",
218-
e);
219-
success = false;
220-
}
221-
222-
if (success) {
223-
for (Snapshot s : toBeCleaned) {
224-
tagDeletion.cleanUnusedDataFiles(s, dataFileSkipper);
225-
}
226-
// delete directories
227-
tagDeletion.cleanEmptyDirectories();
228-
}
229-
230-
return toBeCleaned;
231151
}
232152
}

0 commit comments

Comments
 (0)