Skip to content

Commit 98f7e74

Browse files
committed
commit snapshot version from 2
1 parent ec62ec0 commit 98f7e74

File tree

4 files changed

+22
-29
lines changed

4 files changed

+22
-29
lines changed

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.lancedb.lance.Transaction;
3535
import com.lancedb.lance.Version;
3636
import org.apache.arrow.memory.RootAllocator;
37+
import org.apache.commons.lang3.tuple.ImmutablePair;
3738

3839
import javax.annotation.Nullable;
3940

@@ -93,25 +94,26 @@ public void abort(LanceCommittable committable) throws IOException {
9394
@Override
9495
public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSnapshotIdOfFluss)
9596
throws IOException {
96-
Transaction latestLakeSnapshotIdOfLake =
97+
ImmutablePair<Version, Transaction> latestLakeSnapshotIdOfLake =
9798
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
9899

99-
if (latestLakeSnapshotIdOfLake == null) {
100+
if (latestLakeSnapshotIdOfLake.equals(ImmutablePair.nullPair())) {
100101
return null;
101102
}
102103

103104
// we get the latest snapshot committed by fluss,
104105
// but the latest snapshot is not greater than latestLakeSnapshotIdOfFluss, no any missing
105106
// snapshot, return directly
106107
if (latestLakeSnapshotIdOfFluss != null
107-
&& latestLakeSnapshotIdOfLake.readVersion() <= latestLakeSnapshotIdOfFluss) {
108+
&& latestLakeSnapshotIdOfLake.getLeft().getId() <= latestLakeSnapshotIdOfFluss) {
108109
return null;
109110
}
110111

111112
CommittedLakeSnapshot committedLakeSnapshot =
112-
new CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.readVersion());
113+
new CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.getLeft().getId());
113114
String flussOffsetProperties =
114115
latestLakeSnapshotIdOfLake
116+
.getRight()
115117
.transactionProperties()
116118
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
117119
for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
@@ -131,8 +133,8 @@ public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSna
131133
}
132134

133135
@Nullable
134-
private Transaction getCommittedLatestSnapshotOfLake(String commitUser) {
135-
Transaction latestFlussSnapshot = null;
136+
private ImmutablePair<Version, Transaction> getCommittedLatestSnapshotOfLake(
137+
String commitUser) {
136138
ReadOptions.Builder builder = new ReadOptions.Builder();
137139
builder.setStorageOptions(LanceConfig.genStorageOptions(config));
138140
try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), builder.build())) {
@@ -146,13 +148,12 @@ private Transaction getCommittedLatestSnapshotOfLake(String commitUser) {
146148
if (transaction != null
147149
&& commitUser.equals(
148150
transaction.transactionProperties().get(committerName))) {
149-
latestFlussSnapshot = transaction;
150-
break;
151+
return ImmutablePair.of(version, transaction);
151152
}
152153
}
153154
}
154155
}
155-
return latestFlussSnapshot;
156+
return ImmutablePair.nullPair();
156157
}
157158

158159
@Override

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -224,16 +224,7 @@ protected ArrowType defaultMethod(DataType dataType) {
224224
}
225225

226226
private static int getPrecision(DecimalVector decimalVector) {
227-
int precision = -1;
228-
try {
229-
java.lang.reflect.Field precisionField =
230-
decimalVector.getClass().getDeclaredField("precision");
231-
precisionField.setAccessible(true);
232-
precision = (int) precisionField.get(decimalVector);
233-
} catch (NoSuchFieldException | IllegalAccessException e) {
234-
// should not happen, ignore
235-
}
236-
return precision;
227+
return decimalVector.getPrecision();
237228
}
238229

239230
public static ArrowFieldWriter<InternalRow> createArrowFieldWriter(

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ public static long commitAppend(
6969
.transactionProperties(properties)
7070
.build();
7171
try (Dataset appendedDataset = transaction.commit()) {
72-
// lance dataset version starts from 1
73-
return appendedDataset.version() - 1;
72+
// note: lance dataset version starts from 1
73+
return appendedDataset.version();
7474
}
7575
}
7676
}

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
105105
try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
106106
createLakeCommitter(tablePath)) {
107107
// should no any missing snapshot
108-
assertThat(lakeCommitter.getMissingLakeSnapshot(1L)).isNull();
108+
assertThat(lakeCommitter.getMissingLakeSnapshot(2L)).isNull();
109109
}
110110

111111
Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new HashMap<>();
@@ -161,7 +161,8 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
161161
lanceCommittable,
162162
toBucketOffsetsProperty(
163163
tableBucketOffsets, partitionIdAndName, partitionKeys));
164-
assertThat(snapshot).isEqualTo(1);
164+
// lance dataset version starts from 1
165+
assertThat(snapshot).isEqualTo(2);
165166
}
166167

167168
try (Dataset dataset =
@@ -188,8 +189,8 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
188189
// then, let's verify getMissingLakeSnapshot works
189190
try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
190191
createLakeCommitter(tablePath)) {
191-
// use snapshot id 0 as the known snapshot id
192-
CommittedLakeSnapshot committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(0L);
192+
// use snapshot id 1 as the known snapshot id
193+
CommittedLakeSnapshot committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(1L);
193194
assertThat(committedLakeSnapshot).isNotNull();
194195
Map<Tuple2<Long, Integer>, Long> offsets = committedLakeSnapshot.getLogEndOffsets();
195196
for (int bucket = 0; bucket < 3; bucket++) {
@@ -198,16 +199,16 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
198199
assertThat(offsets.get(Tuple2.of(partitionId, bucket))).isEqualTo(10);
199200
}
200201
}
201-
assertThat(committedLakeSnapshot.getLakeSnapshotId()).isOne();
202+
assertThat(committedLakeSnapshot.getLakeSnapshotId()).isEqualTo(2L);
202203

203204
// use null as the known snapshot id
204205
CommittedLakeSnapshot committedLakeSnapshot2 =
205206
lakeCommitter.getMissingLakeSnapshot(null);
206207
assertThat(committedLakeSnapshot2).isEqualTo(committedLakeSnapshot);
207208

208-
// use snapshot id 1 as the known snapshot id
209-
committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(1L);
210-
// no any missing committed offset since the latest snapshot is 1L
209+
// use snapshot id 2 as the known snapshot id
210+
committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(2L);
211+
// no any missing committed offset since the latest snapshot is 2L
211212
assertThat(committedLakeSnapshot).isNull();
212213
}
213214
}

0 commit comments

Comments
 (0)