Skip to content

Commit dc49678

Browse files
committed
wip
1 parent 1238b3e commit dc49678

File tree

11 files changed

+120
-220
lines changed

11 files changed

+120
-220
lines changed

fluss-lake/fluss-lake-lance/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<packaging>jar</packaging>
3434

3535
<properties>
36-
<lance.version>0.26.1</lance.version>
36+
<lance.version>0.32.1</lance.version>
3737
</properties>
3838

3939
<dependencies>

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,31 +26,14 @@
2626
import com.alibaba.fluss.metadata.TablePath;
2727

2828
import com.lancedb.lance.WriteParams;
29-
import org.apache.arrow.vector.types.TimeUnit;
30-
import org.apache.arrow.vector.types.pojo.ArrowType;
3129
import org.apache.arrow.vector.types.pojo.Field;
3230
import org.apache.arrow.vector.types.pojo.Schema;
3331

3432
import java.util.ArrayList;
3533
import java.util.List;
3634

37-
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
38-
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
39-
import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
40-
4135
/** A Lance implementation of {@link LakeCatalog}. */
4236
public class LanceLakeCatalog implements LakeCatalog {
43-
private static final List<Field> SYSTEM_COLUMNS = new ArrayList<>();
44-
45-
static {
46-
SYSTEM_COLUMNS.add(Field.nullable(BUCKET_COLUMN_NAME, new ArrowType.Int(32, true)));
47-
SYSTEM_COLUMNS.add(Field.nullable(OFFSET_COLUMN_NAME, new ArrowType.Int(64, true)));
48-
SYSTEM_COLUMNS.add(
49-
Field.nullable(
50-
TIMESTAMP_COLUMN_NAME,
51-
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)));
52-
}
53-
5437
private final Configuration options;
5538

5639
public LanceLakeCatalog(Configuration config) {
@@ -78,8 +61,6 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
7861
fields.addAll(
7962
LanceArrowUtils.toArrowSchema(tableDescriptor.getSchema().getRowType())
8063
.getFields());
81-
// add system metadata columns to schema
82-
fields.addAll(SYSTEM_COLUMNS);
8364
try {
8465
LanceDatasetAdapter.createDataset(config.getDatasetUri(), new Schema(fields), params);
8566
} catch (RuntimeException e) {

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919

2020
import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
2121
import com.alibaba.fluss.lake.lance.writers.ArrowFieldWriter;
22-
import com.alibaba.fluss.row.GenericRow;
2322
import com.alibaba.fluss.row.InternalRow;
24-
import com.alibaba.fluss.row.TimestampLtz;
2523
import com.alibaba.fluss.types.RowType;
2624

2725
import org.apache.arrow.vector.FieldVector;
@@ -34,7 +32,6 @@ public class ArrowWriter {
3432
*/
3533
private final ArrowFieldWriter<InternalRow>[] fieldWriters;
3634

37-
private static final int LAKE_LANCE_SYSTEM_COLUMNS = 3;
3835
private int recordsCount;
3936
private VectorSchemaRoot root;
4037

@@ -56,16 +53,10 @@ public static ArrowWriter create(VectorSchemaRoot root, RowType rowType) {
5653
}
5754

5855
/** Writes the specified row which is serialized into Arrow format. */
59-
public void writeRow(InternalRow row, int bucket, long offset, long timestamp) {
60-
int i;
61-
for (i = 0; i < fieldWriters.length - LAKE_LANCE_SYSTEM_COLUMNS; i++) {
56+
public void writeRow(InternalRow row) {
57+
for (int i = 0; i < fieldWriters.length; i++) {
6258
fieldWriters[i].write(row, i, true);
6359
}
64-
fieldWriters[i].write(GenericRow.of(bucket), 0, true);
65-
i++;
66-
fieldWriters[i].write(GenericRow.of(offset), 0, true);
67-
i++;
68-
fieldWriters[i].write(GenericRow.of(TimestampLtz.fromEpochMillis(timestamp)), 0, true);
6960
recordsCount++;
7061
}
7162

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.alibaba.fluss.lake.lance.tiering;
1919

20-
import com.alibaba.fluss.metadata.TableBucket;
2120
import com.alibaba.fluss.record.LogRecord;
2221
import com.alibaba.fluss.types.RowType;
2322

@@ -46,21 +45,15 @@ public class LanceArrowWriter extends ArrowReader {
4645
private final AtomicInteger count = new AtomicInteger(0);
4746
private final Semaphore writeToken;
4847
private final Semaphore loadToken;
49-
private final int bucket;
5048

5149
public LanceArrowWriter(
52-
BufferAllocator allocator,
53-
Schema schema,
54-
int batchSize,
55-
TableBucket tableBucket,
56-
RowType rowType) {
50+
BufferAllocator allocator, Schema schema, int batchSize, RowType rowType) {
5751
super(allocator);
5852
checkNotNull(schema);
5953
checkArgument(batchSize > 0);
6054
this.schema = schema;
6155
this.rowType = rowType;
6256
this.batchSize = batchSize;
63-
this.bucket = tableBucket.getBucket();
6457
this.writeToken = new Semaphore(0);
6558
this.loadToken = new Semaphore(0);
6659
}
@@ -70,7 +63,7 @@ void write(LogRecord row) {
7063
try {
7164
// wait util prepareLoadNextBatch to release write token,
7265
writeToken.acquire();
73-
arrowWriter.writeRow(row.getRow(), bucket, row.logOffset(), row.timestamp());
66+
arrowWriter.writeRow(row.getRow());
7467
if (count.incrementAndGet() == batchSize) {
7568
// notify loadNextBatch to take the batch
7669
loadToken.release();

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,33 @@
1818
package com.alibaba.fluss.lake.lance.tiering;
1919

2020
import com.alibaba.fluss.config.Configuration;
21+
import com.alibaba.fluss.lake.committer.BucketOffset;
2122
import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
2223
import com.alibaba.fluss.lake.committer.LakeCommitter;
2324
import com.alibaba.fluss.lake.lance.LanceConfig;
2425
import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
2526
import com.alibaba.fluss.metadata.TablePath;
27+
import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
28+
import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
29+
import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
2630

2731
import com.lancedb.lance.FragmentMetadata;
28-
import org.apache.arrow.vector.BigIntVector;
29-
import org.apache.arrow.vector.IntVector;
30-
import org.apache.arrow.vector.VectorSchemaRoot;
31-
import org.apache.arrow.vector.ipc.ArrowReader;
3232

3333
import javax.annotation.Nullable;
3434

3535
import java.io.IOException;
36-
import java.util.Arrays;
3736
import java.util.Collections;
38-
import java.util.LinkedHashMap;
3937
import java.util.List;
4038
import java.util.Map;
4139
import java.util.Optional;
4240
import java.util.stream.Collectors;
4341

44-
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
45-
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
42+
import static com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
4643

4744
/** Implementation of {@link LakeCommitter} for Lance. */
4845
public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult, LanceCommittable> {
4946
private final LanceConfig config;
47+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
5048

5149
public LanceLakeCommitter(Configuration options, TablePath tablePath) {
5250
this.config =
@@ -71,9 +69,8 @@ public LanceCommittable toCommittable(List<LanceWriteResult> lanceWriteResults)
7169
@Override
7270
public long commit(LanceCommittable committable, Map<String, String> snapshotProperties)
7371
throws IOException {
74-
// TODO: store bucketLogEndOffsets in Lance transaction properties, see
75-
// https://github.com/lancedb/lance/issues/4181
76-
return LanceDatasetAdapter.appendFragments(config, committable.committable());
72+
return LanceDatasetAdapter.commitAppend(
73+
config, committable.committable(), snapshotProperties);
7774
}
7875

7976
@Override
@@ -105,33 +102,23 @@ public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSna
105102

106103
CommittedLakeSnapshot committedLakeSnapshot =
107104
new CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.get());
108-
109-
LinkedHashMap<Integer, Long> bucketEndOffset = new LinkedHashMap<>();
110-
111-
ArrowReader reader =
112-
LanceDatasetAdapter.getArrowReader(
113-
LanceConfig.from(
114-
config.getOptions(),
115-
Collections.emptyMap(),
116-
config.getDatabaseName(),
117-
config.getTableName()),
118-
Arrays.asList(BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME),
119-
Collections.emptyList(),
120-
(int) (latestLakeSnapshotIdOfLake.get() + 1));
121-
VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
122-
while (reader.loadNextBatch()) {
123-
IntVector bucketVector = (IntVector) readerRoot.getVector(BUCKET_COLUMN_NAME);
124-
BigIntVector offsetVector = (BigIntVector) readerRoot.getVector(OFFSET_COLUMN_NAME);
125-
for (int i = 0; i < bucketVector.getValueCount(); i++) {
126-
if (!bucketEndOffset.containsKey(bucketVector.get(i))
127-
|| bucketEndOffset.get(bucketVector.get(i)) < offsetVector.get(i)) {
128-
bucketEndOffset.put(bucketVector.get(i), offsetVector.get(i));
129-
}
105+
String flussOffsetProperties =
106+
LanceDatasetAdapter.getTransactionProperties(
107+
config, Math.toIntExact(latestLakeSnapshotIdOfLake.get()))
108+
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
109+
for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
110+
BucketOffset bucketOffset = BucketOffsetJsonSerde.INSTANCE.deserialize(node);
111+
if (bucketOffset.getPartitionId() != null) {
112+
committedLakeSnapshot.addPartitionBucket(
113+
bucketOffset.getPartitionId(),
114+
bucketOffset.getPartitionQualifiedName(),
115+
bucketOffset.getBucket(),
116+
bucketOffset.getLogOffset());
117+
} else {
118+
committedLakeSnapshot.addBucket(
119+
bucketOffset.getBucket(), bucketOffset.getLogOffset());
130120
}
131121
}
132-
for (Map.Entry<Integer, Long> entry : bucketEndOffset.entrySet()) {
133-
committedLakeSnapshot.addBucket(entry.getKey(), entry.getValue());
134-
}
135122
return committedLakeSnapshot;
136123
}
137124

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323
import com.alibaba.fluss.lake.writer.LakeWriter;
2424
import com.alibaba.fluss.lake.writer.WriterInitContext;
2525
import com.alibaba.fluss.record.LogRecord;
26-
import com.alibaba.fluss.types.DataField;
27-
import com.alibaba.fluss.types.DataTypes;
28-
import com.alibaba.fluss.types.RowType;
2926

3027
import com.lancedb.lance.FragmentMetadata;
3128
import com.lancedb.lance.WriteParams;
@@ -38,10 +35,6 @@
3835
import java.util.concurrent.ExecutionException;
3936
import java.util.concurrent.FutureTask;
4037

41-
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
42-
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
43-
import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
44-
4538
/** Implementation of {@link LakeWriter} for Lance. */
4639
public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
4740
private final LanceArrowWriter arrowWriter;
@@ -61,20 +54,9 @@ public LanceLakeWriter(Configuration options, WriterInitContext writerInitContex
6154
throw new IOException("Fail to get dataset " + config.getDatasetUri() + " in Lance.");
6255
}
6356

64-
RowType.Builder rowTypeBuilder = RowType.builder();
65-
for (DataField field : writerInitContext.schema().getRowType().getFields()) {
66-
rowTypeBuilder.field(field.getName(), field.getType());
67-
}
68-
rowTypeBuilder.field(BUCKET_COLUMN_NAME, DataTypes.INT());
69-
rowTypeBuilder.field(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
70-
rowTypeBuilder.field(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_LTZ());
71-
7257
this.arrowWriter =
7358
LanceDatasetAdapter.getArrowWriter(
74-
schema.get(),
75-
batchSize,
76-
writerInitContext.tableBucket(),
77-
rowTypeBuilder.build());
59+
schema.get(), batchSize, writerInitContext.schema().getRowType());
7860

7961
WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
8062
Callable<List<FragmentMetadata>> fragmentCreator =

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,25 @@
1919

2020
import com.alibaba.fluss.lake.lance.LanceConfig;
2121
import com.alibaba.fluss.lake.lance.tiering.LanceArrowWriter;
22-
import com.alibaba.fluss.metadata.TableBucket;
2322
import com.alibaba.fluss.types.RowType;
2423

2524
import com.lancedb.lance.Dataset;
2625
import com.lancedb.lance.Fragment;
2726
import com.lancedb.lance.FragmentMetadata;
2827
import com.lancedb.lance.FragmentOperation;
2928
import com.lancedb.lance.ReadOptions;
29+
import com.lancedb.lance.Transaction;
3030
import com.lancedb.lance.WriteParams;
31-
import com.lancedb.lance.ipc.ColumnOrdering;
32-
import com.lancedb.lance.ipc.ScanOptions;
31+
import com.lancedb.lance.operation.Append;
3332
import org.apache.arrow.c.ArrowArrayStream;
3433
import org.apache.arrow.c.Data;
3534
import org.apache.arrow.memory.BufferAllocator;
3635
import org.apache.arrow.memory.RootAllocator;
3736
import org.apache.arrow.vector.ipc.ArrowReader;
3837
import org.apache.arrow.vector.types.pojo.Schema;
3938

40-
import javax.annotation.Nullable;
41-
42-
import java.util.Collections;
4339
import java.util.List;
40+
import java.util.Map;
4441
import java.util.Optional;
4542

4643
/** Lance dataset API adapter. */
@@ -81,6 +78,29 @@ public static long appendFragments(LanceConfig config, List<FragmentMetadata> fr
8178
}
8279
}
8380

81+
public static long commitAppend(
82+
LanceConfig config, List<FragmentMetadata> fragments, Map<String, String> properties) {
83+
String uri = config.getDatasetUri();
84+
ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
85+
Dataset dataset = Dataset.open(allocator, uri, options);
86+
Transaction transaction =
87+
dataset.newTransactionBuilder()
88+
.operation(Append.builder().fragments(fragments).build())
89+
.transactionProperties(properties)
90+
.build();
91+
try (Dataset appendedDataset = transaction.commit()) {
92+
return appendedDataset.version() - 1;
93+
}
94+
}
95+
96+
public static Map<String, String> getTransactionProperties(LanceConfig config, int version) {
97+
ReadOptions.Builder builder = new ReadOptions.Builder();
98+
builder.setVersion(version);
99+
builder.setStorageOptions(LanceConfig.genStorageOptions(config));
100+
Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), builder.build());
101+
dataset.readTransaction();
102+
}
103+
84104
public static Optional<Long> getVersion(LanceConfig config) {
85105
String uri = config.getDatasetUri();
86106
ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
@@ -93,36 +113,8 @@ public static Optional<Long> getVersion(LanceConfig config) {
93113
}
94114
}
95115

96-
public static ArrowReader getArrowReader(LanceConfig config) {
97-
return getArrowReader(config, Collections.emptyList(), Collections.emptyList(), null);
98-
}
99-
100-
public static ArrowReader getArrowReader(
101-
LanceConfig config,
102-
List<String> columns,
103-
List<ColumnOrdering> columnOrderings,
104-
@Nullable Integer version) {
105-
ReadOptions.Builder builder = new ReadOptions.Builder();
106-
builder.setStorageOptions(LanceConfig.genStorageOptions(config));
107-
if (version != null) {
108-
builder.setVersion(version);
109-
}
110-
try (Dataset datasetRead =
111-
Dataset.open(allocator, config.getDatasetUri(), builder.build())) {
112-
ScanOptions.Builder scanOptionBuilder = new ScanOptions.Builder();
113-
if (!columns.isEmpty()) {
114-
scanOptionBuilder.columns(columns);
115-
}
116-
if (!columnOrderings.isEmpty()) {
117-
scanOptionBuilder.setColumnOrderings(columnOrderings);
118-
}
119-
return datasetRead.newScan(scanOptionBuilder.build()).scanBatches();
120-
}
121-
}
122-
123-
public static LanceArrowWriter getArrowWriter(
124-
Schema schema, int batchSize, TableBucket tableBucket, RowType rowType) {
125-
return new LanceArrowWriter(allocator, schema, batchSize, tableBucket, rowType);
116+
public static LanceArrowWriter getArrowWriter(Schema schema, int batchSize, RowType rowType) {
117+
return new LanceArrowWriter(allocator, schema, batchSize, rowType);
126118
}
127119

128120
public static List<FragmentMetadata> createFragment(

0 commit comments

Comments
 (0)