Skip to content

Commit 1875582

Browse files
committed
rebase master
1 parent 62902ff commit 1875582

File tree

2 files changed

+24
-10
lines changed

2 files changed

+24
-10
lines changed

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public LanceCommittable toCommittable(List<LanceWriteResult> lanceWriteResults)
6565
}
6666

6767
@Override
68-
public long commit(LanceCommittable committable) throws IOException {
68+
public long commit(LanceCommittable committable, Map<String, String> snapshotProperties)
69+
throws IOException {
6970
// TODO: store bucketLogEndOffsets in Lance transaction properties, see
7071
// https://github.com/lancedb/lance/issues/4181
7172
return LanceDatasetAdapter.appendFragments(config, committable.committable());

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@
5252
import java.io.File;
5353
import java.io.IOException;
5454
import java.util.ArrayList;
55-
import java.util.Arrays;
5655
import java.util.Collections;
5756
import java.util.HashMap;
5857
import java.util.List;
5958
import java.util.Map;
6059
import java.util.stream.Stream;
6160

61+
import static com.alibaba.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
6262
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
6363
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
6464
import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -106,11 +106,21 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
106106
}
107107

108108
Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new HashMap<>();
109-
List<String> partitions =
110-
isPartitioned ? Arrays.asList("p1", "p2", "p3") : Collections.singletonList(null);
109+
Map<Long, String> partitionIdAndName =
110+
isPartitioned
111+
? new HashMap<Long, String>() {
112+
{
113+
put(1L, "p1");
114+
put(2L, "p2");
115+
put(3L, "p3");
116+
}
117+
}
118+
: Collections.singletonMap(null, null);
119+
Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
111120
// first, write data
112121
for (int bucket = 0; bucket < bucketNum; bucket++) {
113-
for (String partition : partitions) {
122+
for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
123+
String partition = entry.getValue();
114124
try (LakeWriter<LanceWriteResult> lakeWriter =
115125
createLakeWriter(tablePath, bucket, partition, schema)) {
116126
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
@@ -119,6 +129,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
119129
List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
120130
List<LogRecord> expectRecords = writeAndExpectRecords.f1;
121131
recordsByBucket.put(partitionBucket, expectRecords);
132+
tableBucketOffsets.put(new TableBucket(0, entry.getKey(), bucket), 10L);
122133
for (LogRecord logRecord : writtenRecords) {
123134
lakeWriter.write(logRecord);
124135
}
@@ -141,7 +152,9 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
141152
lanceCommittable =
142153
committableSerializer.deserialize(
143154
committableSerializer.getVersion(), serialized);
144-
long snapshot = lakeCommitter.commit(lanceCommittable);
155+
long snapshot =
156+
lakeCommitter.commit(
157+
lanceCommittable, toBucketOffsetsProperty(tableBucketOffsets));
145158
assertThat(snapshot).isEqualTo(1);
146159
}
147160

@@ -153,7 +166,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
153166

154167
// then, check data
155168
for (int bucket = 0; bucket < 3; bucket++) {
156-
for (String partition : partitions) {
169+
for (String partition : partitionIdAndName.values()) {
157170
reader.loadNextBatch();
158171
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
159172
List<LogRecord> expectRecords = recordsByBucket.get(partitionBucket);
@@ -168,11 +181,11 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
168181
// use snapshot id 0 as the known snapshot id
169182
CommittedLakeSnapshot committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(0L);
170183
assertThat(committedLakeSnapshot).isNotNull();
171-
Map<Tuple2<String, Integer>, Long> offsets = committedLakeSnapshot.getLogEndOffsets();
184+
Map<Tuple2<Long, Integer>, Long> offsets = committedLakeSnapshot.getLogEndOffsets();
172185
for (int bucket = 0; bucket < 3; bucket++) {
173-
for (String partition : partitions) {
186+
for (Long partitionId : partitionIdAndName.keySet()) {
174187
// we only write 10 records, so expected log offset should be 9
175-
assertThat(offsets.get(Tuple2.of(partition, bucket))).isEqualTo(9);
188+
assertThat(offsets.get(Tuple2.of(partitionId, bucket))).isEqualTo(9);
176189
}
177190
}
178191
assertThat(committedLakeSnapshot.getLakeSnapshotId()).isOne();

0 commit comments

Comments
 (0)