Skip to content

Commit d9a7362

Browse files
committed
add test for txn property check
1 parent f19f649 commit d9a7362

File tree

3 files changed

+39
-12
lines changed

3 files changed

+39
-12
lines changed

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSna
104104
new CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.get());
105105
String flussOffsetProperties =
106106
LanceDatasetAdapter.getTransactionProperties(
107-
config, Math.toIntExact(latestLakeSnapshotIdOfLake.get() + 1))
107+
config, latestLakeSnapshotIdOfLake.get() + 1)
108108
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
109109
for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
110110
BucketOffset bucketOffset = BucketOffsetJsonSerde.INSTANCE.deserialize(node);

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.arrow.vector.ipc.ArrowReader;
3737
import org.apache.arrow.vector.types.pojo.Schema;
3838

39+
import javax.annotation.Nullable;
40+
3941
import java.util.Collections;
4042
import java.util.List;
4143
import java.util.Map;
@@ -95,9 +97,12 @@ public static long commitAppend(
9597
}
9698
}
9799

98-
public static Map<String, String> getTransactionProperties(LanceConfig config, int version) {
100+
public static Map<String, String> getTransactionProperties(
101+
LanceConfig config, @Nullable Long version) {
99102
ReadOptions.Builder builder = new ReadOptions.Builder();
100-
builder.setVersion(version);
103+
if (version != null) {
104+
builder.setVersion(Math.toIntExact(version));
105+
}
101106
builder.setStorageOptions(LanceConfig.genStorageOptions(config));
102107
try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), builder.build())) {
103108
Transaction transaction = dataset.readTransaction().orElse(null);

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.config.Configuration;
2121
import com.alibaba.fluss.lake.lance.LanceConfig;
2222
import com.alibaba.fluss.lake.lance.testutils.FlinkLanceTieringTestBase;
23+
import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
2324
import com.alibaba.fluss.metadata.TableBucket;
2425
import com.alibaba.fluss.metadata.TablePath;
2526
import com.alibaba.fluss.row.InternalRow;
@@ -37,9 +38,12 @@
3738
import java.util.ArrayList;
3839
import java.util.Arrays;
3940
import java.util.Collections;
41+
import java.util.HashMap;
4042
import java.util.Iterator;
4143
import java.util.List;
44+
import java.util.Map;
4245

46+
import static com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
4347
import static com.alibaba.fluss.testutils.DataTestUtils.row;
4448
import static org.assertj.core.api.Assertions.assertThat;
4549

@@ -80,20 +84,38 @@ void testTiering() throws Exception {
8084
// note: we can't update log start offset for unaware bucket mode log table
8185
assertReplicaStatus(t1Bucket, 30);
8286

87+
LanceConfig config =
88+
LanceConfig.from(
89+
lanceConf.toMap(),
90+
Collections.emptyMap(),
91+
t1.getDatabaseName(),
92+
t1.getTableName());
93+
8394
// check data in lance
84-
checkDataInLanceAppendOnlyTable(t1, flussRows);
95+
checkDataInLanceAppendOnlyTable(config, flussRows);
96+
// check snapshot property in lance
97+
Map<String, String> properties =
98+
new HashMap<String, String>() {
99+
{
100+
put(
101+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
102+
"[{\"bucket_id\":0,\"log_offset\":30}]");
103+
}
104+
};
105+
checkSnapshotPropertyInLance(config, properties);
85106

86107
jobClient.cancel().get();
87108
}
88109

89-
private void checkDataInLanceAppendOnlyTable(
90-
TablePath tablePath, List<InternalRow> expectedRows) throws Exception {
91-
LanceConfig config =
92-
LanceConfig.from(
93-
lanceConf.toMap(),
94-
Collections.emptyMap(),
95-
tablePath.getDatabaseName(),
96-
tablePath.getTableName());
110+
private void checkSnapshotPropertyInLance(
111+
LanceConfig config, Map<String, String> expectedProperties) throws Exception {
112+
Map<String, String> transactionProperties =
113+
LanceDatasetAdapter.getTransactionProperties(config, null);
114+
assertThat(transactionProperties).isEqualTo(expectedProperties);
115+
}
116+
117+
private void checkDataInLanceAppendOnlyTable(LanceConfig config, List<InternalRow> expectedRows)
118+
throws Exception {
97119

98120
try (Dataset dataset =
99121
Dataset.open(

0 commit comments

Comments
 (0)