|
44 | 44 |
|
45 | 45 | import java.io.IOException; |
46 | 46 | import java.util.Arrays; |
| 47 | +import java.util.Collections; |
47 | 48 | import java.util.List; |
48 | 49 | import java.util.Map; |
49 | 50 | import java.util.stream.Collectors; |
|
55 | 56 |
|
56 | 57 | /** Implementation of {@link LakeCommitter} for Iceberg. */ |
57 | 58 | public class IcebergLakeCommitter implements LakeCommitter<IcebergWriteResult, IcebergCommittable> { |
| 59 | + private static final String COMMITTER_USER = "commit-user"; |
58 | 60 |
|
59 | 61 | private final Catalog icebergCatalog; |
60 | 62 | private final Table icebergTable; |
@@ -139,7 +141,7 @@ public long commit(IcebergCommittable committable, Map<String, String> snapshotP |
139 | 141 |
|
140 | 142 | private void addFlussProperties( |
141 | 143 | SnapshotUpdate<?> operation, Map<String, String> snapshotProperties) { |
142 | | - operation.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER); |
| 144 | + operation.set(COMMITTER_USER, FLUSS_LAKE_TIERING_COMMIT_USER); |
143 | 145 | for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) { |
144 | 146 | operation.set(entry.getKey(), entry.getValue()); |
145 | 147 | } |
@@ -173,9 +175,19 @@ public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSna |
173 | 175 | } |
174 | 176 |
|
175 | 177 | // Check if there's a gap between Fluss and Iceberg snapshots |
176 | | - if (latestLakeSnapshotIdOfFluss != null |
177 | | - && latestLakeSnapshot.snapshotId() <= latestLakeSnapshotIdOfFluss) { |
178 | | - return null; |
| 178 | + if (latestLakeSnapshotIdOfFluss != null) { |
| 179 | + Snapshot latestLakeSnapshotOfFluss = icebergTable.snapshot(latestLakeSnapshotIdOfFluss); |
| 180 | + if (latestLakeSnapshotOfFluss == null) { |
| 181 | + throw new IllegalStateException( |
| 182 | + "Referenced Fluss snapshot " |
| 183 | + + latestLakeSnapshotIdOfFluss |
| 184 | + + " not found in Iceberg table"); |
| 185 | + } |
| 186 | + // note: we need to use sequence number to compare, |
| 187 | + // we can't use snapshot id as the snapshot id is not ordered |
| 188 | + if (latestLakeSnapshot.sequenceNumber() <= latestLakeSnapshotOfFluss.sequenceNumber()) { |
| 189 | + return null; |
| 190 | + } |
179 | 191 | } |
180 | 192 |
|
181 | 193 | CommittedLakeSnapshot committedLakeSnapshot = |
@@ -237,20 +249,17 @@ private Snapshot getCommittedLatestSnapshotOfLake(String commitUser) { |
237 | 249 | icebergTable.refresh(); |
238 | 250 |
|
239 | 251 | // Find the latest snapshot committed by Fluss |
240 | | - Iterable<Snapshot> snapshots = icebergTable.snapshots(); |
241 | | - Snapshot latestFlussSnapshot = null; |
242 | | - |
| 252 | + List<Snapshot> snapshots = (List<Snapshot>) icebergTable.snapshots(); |
| 253 | + // snapshots() returns snapshots in chronological order (oldest to newest), Reverse to find |
| 254 | + // most recent snapshot committed by Fluss |
| 255 | + Collections.reverse(snapshots); |
243 | 256 | for (Snapshot snapshot : snapshots) { |
244 | 257 | Map<String, String> summary = snapshot.summary(); |
245 | | - if (summary != null && commitUser.equals(summary.get("commit-user"))) { |
246 | | - if (latestFlussSnapshot == null |
247 | | - || snapshot.snapshotId() > latestFlussSnapshot.snapshotId()) { |
248 | | - latestFlussSnapshot = snapshot; |
249 | | - } |
| 258 | + if (summary != null && commitUser.equals(summary.get(COMMITTER_USER))) { |
| 259 | + return snapshot; |
250 | 260 | } |
251 | 261 | } |
252 | | - |
253 | | - return latestFlussSnapshot; |
| 262 | + return null; |
254 | 263 | } |
255 | 264 |
|
256 | 265 | /** A {@link Listener} to listen the iceberg create snapshot event. */ |
|
0 commit comments