Skip to content

Commit ec62ec0

Browse files
committed
iterate dataset versions from end to start
1 parent 425f098 commit ec62ec0

File tree

1 file changed

+10
-11
lines changed

1 file changed

+10
-11
lines changed

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
/** Implementation of {@link LakeCommitter} for Lance. */
5151
public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult, LanceCommittable> {
5252
private final LanceConfig config;
53-
private static String commitUser = "commit-user";
53+
private static final String committerName = "commit-user";
5454
private final RootAllocator allocator = new RootAllocator();
5555
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
5656

@@ -78,7 +78,7 @@ public LanceCommittable toCommittable(List<LanceWriteResult> lanceWriteResults)
7878
public long commit(LanceCommittable committable, Map<String, String> snapshotProperties)
7979
throws IOException {
8080
Map<String, String> properties = new HashMap<>(snapshotProperties);
81-
properties.put(commitUser, FLUSS_LAKE_TIERING_COMMIT_USER);
81+
properties.put(committerName, FLUSS_LAKE_TIERING_COMMIT_USER);
8282
return LanceDatasetAdapter.commitAppend(config, committable.committable(), properties);
8383
}
8484

@@ -133,22 +133,21 @@ public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSna
133133
@Nullable
134134
private Transaction getCommittedLatestSnapshotOfLake(String commitUser) {
135135
Transaction latestFlussSnapshot = null;
136-
137136
ReadOptions.Builder builder = new ReadOptions.Builder();
138137
builder.setStorageOptions(LanceConfig.genStorageOptions(config));
139138
try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), builder.build())) {
140-
for (Version version : dataset.listVersions()) {
139+
List<Version> versions = dataset.listVersions();
140+
for (int i = versions.size() - 1; i >= 0; i--) {
141+
Version version = versions.get(i);
141142
builder.setVersion((int) version.getId());
142-
try (Dataset datasetVersion =
143+
try (Dataset datasetRead =
143144
Dataset.open(allocator, config.getDatasetUri(), builder.build())) {
144-
Transaction transaction = datasetVersion.readTransaction().orElse(null);
145+
Transaction transaction = datasetRead.readTransaction().orElse(null);
145146
if (transaction != null
146147
&& commitUser.equals(
147-
transaction.transactionProperties().get(commitUser))) {
148-
if (latestFlussSnapshot == null
149-
|| transaction.readVersion() > latestFlussSnapshot.readVersion()) {
150-
latestFlussSnapshot = transaction;
151-
}
148+
transaction.transactionProperties().get(committerName))) {
149+
latestFlussSnapshot = transaction;
150+
break;
152151
}
153152
}
154153
}

0 commit comments

Comments
 (0)