Skip to content

Commit e6efa54

Browse files
committed
upgrade lance version to utilize txn property
1 parent dc49678 commit e6efa54

File tree

4 files changed

+26
-25
lines changed

4 files changed

+26
-25
lines changed

fluss-lake/fluss-lake-lance/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<packaging>jar</packaging>
3434

3535
<properties>
36-
<lance.version>0.32.1</lance.version>
36+
<lance.version>0.33.0</lance.version>
3737
</properties>
3838

3939
<dependencies>

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSna
104104
new CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.get());
105105
String flussOffsetProperties =
106106
LanceDatasetAdapter.getTransactionProperties(
107-
config, Math.toIntExact(latestLakeSnapshotIdOfLake.get()))
107+
config, Math.toIntExact(latestLakeSnapshotIdOfLake.get() + 1))
108108
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
109109
for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
110110
BucketOffset bucketOffset = BucketOffsetJsonSerde.INSTANCE.deserialize(node);

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.arrow.vector.ipc.ArrowReader;
3737
import org.apache.arrow.vector.types.pojo.Schema;
3838

39+
import java.util.Collections;
3940
import java.util.List;
4041
import java.util.Map;
4142
import java.util.Optional;
@@ -82,23 +83,29 @@ public static long commitAppend(
8283
LanceConfig config, List<FragmentMetadata> fragments, Map<String, String> properties) {
8384
String uri = config.getDatasetUri();
8485
ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
85-
Dataset dataset = Dataset.open(allocator, uri, options);
86-
Transaction transaction =
87-
dataset.newTransactionBuilder()
88-
.operation(Append.builder().fragments(fragments).build())
89-
.transactionProperties(properties)
90-
.build();
91-
try (Dataset appendedDataset = transaction.commit()) {
92-
return appendedDataset.version() - 1;
86+
try (Dataset dataset = Dataset.open(allocator, uri, options)) {
87+
Transaction transaction =
88+
dataset.newTransactionBuilder()
89+
.operation(Append.builder().fragments(fragments).build())
90+
.transactionProperties(properties)
91+
.build();
92+
try (Dataset appendedDataset = transaction.commit()) {
93+
return appendedDataset.version() - 1;
94+
}
9395
}
9496
}
9597

9698
public static Map<String, String> getTransactionProperties(LanceConfig config, int version) {
9799
ReadOptions.Builder builder = new ReadOptions.Builder();
98100
builder.setVersion(version);
99101
builder.setStorageOptions(LanceConfig.genStorageOptions(config));
100-
Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), builder.build());
101-
dataset.readTransaction();
102+
try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), builder.build())) {
103+
Transaction transaction = dataset.readTransaction().orElse(null);
104+
if (transaction != null) {
105+
return transaction.transactionProperties();
106+
}
107+
return Collections.emptyMap();
108+
}
102109
}
103110

104111
public static Optional<Long> getVersion(LanceConfig config) {

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
9393
customProperties,
9494
tablePath.getDatabaseName(),
9595
tablePath.getTableName());
96-
Schema schema = createTable(tablePath, isPartitioned, null, config);
96+
Schema schema = createTable(config);
9797

9898
List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
9999
SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
@@ -118,7 +118,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
118118
}
119119
}
120120
: Collections.singletonMap(null, null);
121-
List<String> partitionKeys = new ArrayList<>(partitionIdAndName.values());
121+
List<String> partitionKeys = isPartitioned ? List.of("c3") : null;
122122
Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
123123
// first, write data
124124
for (int bucket = 0; bucket < bucketNum; bucket++) {
@@ -196,8 +196,8 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
196196
Map<Tuple2<Long, Integer>, Long> offsets = committedLakeSnapshot.getLogEndOffsets();
197197
for (int bucket = 0; bucket < 3; bucket++) {
198198
for (Long partitionId : partitionIdAndName.keySet()) {
199-
// we only write 10 records, so expected log offset should be 9
200-
assertThat(offsets.get(Tuple2.of(partitionId, bucket))).isEqualTo(9);
199+
// we only write 10 records, so expected log offset should be 10
200+
assertThat(offsets.get(Tuple2.of(partitionId, bucket))).isEqualTo(10);
201201
}
202202
}
203203
assertThat(committedLakeSnapshot.getLakeSnapshotId()).isOne();
@@ -303,24 +303,18 @@ private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
303303
return Tuple2.of(logRecords, logRecords);
304304
}
305305

306-
private Schema createTable(
307-
TablePath tablePath,
308-
boolean isPartitioned,
309-
@Nullable Integer numBuckets,
310-
LanceConfig config)
311-
throws Exception {
306+
private Schema createTable(LanceConfig config) throws Exception {
312307
List<Schema.Column> columns = new ArrayList<>();
313308
columns.add(new Schema.Column("c1", DataTypes.INT()));
314309
columns.add(new Schema.Column("c2", DataTypes.STRING()));
315310
columns.add(new Schema.Column("c3", DataTypes.STRING()));
316311
Schema.Builder schemaBuilder = Schema.newBuilder().fromColumns(columns);
317312
Schema schema = schemaBuilder.build();
318-
doCreateLanceTable(tablePath, schema, config);
313+
doCreateLanceTable(schema, config);
319314
return schema;
320315
}
321316

322-
private void doCreateLanceTable(TablePath tablePath, Schema schema, LanceConfig config)
323-
throws Exception {
317+
private void doCreateLanceTable(Schema schema, LanceConfig config) throws Exception {
324318
WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
325319
LanceDatasetAdapter.createDataset(
326320
config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params);

0 commit comments

Comments
 (0)