Skip to content

Commit 9cdc9b7

Browse files
authored
[lake/lance] Lance lake writer and committer implementation (apache#1441)
1 parent f9e875d commit 9cdc9b7

36 files changed

+2497
-112
lines changed

fluss-lake/fluss-lake-lance/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<packaging>jar</packaging>
3434

3535
<properties>
36-
<lance.version>0.26.1</lance.version>
36+
<lance.version>0.33.0</lance.version>
3737
</properties>
3838

3939
<dependencies>

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java

Lines changed: 11 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,9 @@ public class LanceConfig implements Serializable {
3232

3333
private static final String block_size = "block_size";
3434
private static final String version = "version";
35-
private static final String index_cache_size = "index_cache_size";
36-
private static final String metadata_cache_size = "metadata_cache_size";
3735
private static final String max_row_per_file = "max_row_per_file";
3836
private static final String max_rows_per_group = "max_rows_per_group";
3937
private static final String max_bytes_per_file = "max_bytes_per_file";
40-
private static final String ak = "access_key_id";
41-
private static final String sk = "secret_access_key";
42-
private static final String endpoint = "aws_endpoint";
43-
private static final String region = "aws_region";
44-
private static final String virtual_hosted_style = "virtual_hosted_style_request";
45-
private static final String allow_http = "allow_http";
4638
private static final String batch_size = "batch_size";
4739
private static final String warehouse = "warehouse";
4840

@@ -64,11 +56,17 @@ public LanceConfig(
6456
}
6557

6658
public static LanceConfig from(
67-
Map<String, String> properties, String databaseName, String tableName) {
68-
if (!properties.containsKey(warehouse)) {
59+
Map<String, String> clusterConf,
60+
Map<String, String> tableCustomProperties,
61+
String databaseName,
62+
String tableName) {
63+
if (!clusterConf.containsKey(warehouse)) {
6964
throw new IllegalArgumentException("Missing required option " + warehouse);
7065
}
71-
return new LanceConfig(databaseName, tableName, properties.get(warehouse), properties);
66+
Map<String, String> options = new HashMap<>();
67+
options.putAll(clusterConf);
68+
options.putAll(tableCustomProperties);
69+
return new LanceConfig(databaseName, tableName, clusterConf.get(warehouse), options);
7270
}
7371

7472
public static int getBatchSize(LanceConfig config) {
@@ -83,14 +81,6 @@ public Map<String, String> getOptions() {
8381
return options;
8482
}
8583

86-
public String getDatabaseName() {
87-
return databaseName;
88-
}
89-
90-
public String getTableName() {
91-
return tableName;
92-
}
93-
9484
public String getDatasetUri() {
9585
return datasetUri;
9686
}
@@ -104,12 +94,6 @@ public static ReadOptions genReadOptionFromConfig(LanceConfig config) {
10494
if (maps.containsKey(version)) {
10595
builder.setVersion(Integer.parseInt(maps.get(version)));
10696
}
107-
if (maps.containsKey(index_cache_size)) {
108-
builder.setIndexCacheSize(Integer.parseInt(maps.get(index_cache_size)));
109-
}
110-
if (maps.containsKey(metadata_cache_size)) {
111-
builder.setMetadataCacheSize(Integer.parseInt(maps.get(metadata_cache_size)));
112-
}
11397
builder.setStorageOptions(genStorageOptions(config));
11498
return builder.build();
11599
}
@@ -130,24 +114,8 @@ public static WriteParams genWriteParamsFromConfig(LanceConfig config) {
130114
return builder.build();
131115
}
132116

133-
private static Map<String, String> genStorageOptions(LanceConfig config) {
134-
Map<String, String> storageOptions = new HashMap<>();
135-
Map<String, String> maps = config.getOptions();
136-
if (maps.containsKey(ak) && maps.containsKey(sk) && maps.containsKey(endpoint)) {
137-
storageOptions.put(ak, maps.get(ak));
138-
storageOptions.put(sk, maps.get(sk));
139-
storageOptions.put(endpoint, maps.get(endpoint));
140-
}
141-
if (maps.containsKey(region)) {
142-
storageOptions.put(region, maps.get(region));
143-
}
144-
if (maps.containsKey(virtual_hosted_style)) {
145-
storageOptions.put(virtual_hosted_style, maps.get(virtual_hosted_style));
146-
}
147-
if (maps.containsKey(allow_http)) {
148-
storageOptions.put(allow_http, maps.get(allow_http));
149-
}
150-
return storageOptions;
117+
public static Map<String, String> genStorageOptions(LanceConfig config) {
118+
return config.getOptions();
151119
}
152120

153121
@Override

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,31 +26,14 @@
2626
import com.alibaba.fluss.metadata.TablePath;
2727

2828
import com.lancedb.lance.WriteParams;
29-
import org.apache.arrow.vector.types.TimeUnit;
30-
import org.apache.arrow.vector.types.pojo.ArrowType;
3129
import org.apache.arrow.vector.types.pojo.Field;
3230
import org.apache.arrow.vector.types.pojo.Schema;
3331

3432
import java.util.ArrayList;
3533
import java.util.List;
3634

37-
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
38-
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
39-
import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
40-
4135
/** A Lance implementation of {@link LakeCatalog}. */
4236
public class LanceLakeCatalog implements LakeCatalog {
43-
private static final List<Field> SYSTEM_COLUMNS = new ArrayList<>();
44-
45-
static {
46-
SYSTEM_COLUMNS.add(Field.nullable(BUCKET_COLUMN_NAME, new ArrowType.Int(32, true)));
47-
SYSTEM_COLUMNS.add(Field.nullable(OFFSET_COLUMN_NAME, new ArrowType.Int(64, true)));
48-
SYSTEM_COLUMNS.add(
49-
Field.nullable(
50-
TIMESTAMP_COLUMN_NAME,
51-
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)));
52-
}
53-
5437
private final Configuration options;
5538

5639
public LanceLakeCatalog(Configuration config) {
@@ -67,16 +50,17 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
6750

6851
LanceConfig config =
6952
LanceConfig.from(
70-
options.toMap(), tablePath.getDatabaseName(), tablePath.getTableName());
53+
options.toMap(),
54+
tableDescriptor.getCustomProperties(),
55+
tablePath.getDatabaseName(),
56+
tablePath.getTableName());
7157
WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
7258

7359
List<Field> fields = new ArrayList<>();
7460
// set schema
7561
fields.addAll(
7662
LanceArrowUtils.toArrowSchema(tableDescriptor.getSchema().getRowType())
7763
.getFields());
78-
// add system metadata columns to schema
79-
fields.addAll(SYSTEM_COLUMNS);
8064
try {
8165
LanceDatasetAdapter.createDataset(config.getDatasetUri(), new Schema(fields), params);
8266
} catch (RuntimeException e) {

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.config.Configuration;
2121
import com.alibaba.fluss.lake.lakestorage.LakeStorage;
2222
import com.alibaba.fluss.lake.lance.tiering.LanceCommittable;
23+
import com.alibaba.fluss.lake.lance.tiering.LanceLakeTieringFactory;
2324
import com.alibaba.fluss.lake.lance.tiering.LanceWriteResult;
2425
import com.alibaba.fluss.lake.source.LakeSource;
2526
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
@@ -35,7 +36,7 @@ public LanceLakeStorage(Configuration configuration) {
3536

3637
@Override
3738
public LakeTieringFactory<LanceWriteResult, LanceCommittable> createLakeTieringFactory() {
38-
throw new UnsupportedOperationException("Not implemented");
39+
return new LanceLakeTieringFactory(config);
3940
}
4041

4142
@Override
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.lance.tiering;
19+
20+
import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
21+
import com.alibaba.fluss.lake.lance.writers.ArrowFieldWriter;
22+
import com.alibaba.fluss.row.InternalRow;
23+
import com.alibaba.fluss.types.RowType;
24+
25+
import org.apache.arrow.vector.FieldVector;
26+
import org.apache.arrow.vector.VectorSchemaRoot;
27+
28+
/** An Arrow writer for {@link InternalRow}. */
29+
public class ArrowWriter {
30+
private final VectorSchemaRoot root;
31+
32+
private final ArrowFieldWriter<InternalRow>[] fieldWriters;
33+
34+
private int recordsCount;
35+
36+
/**
37+
* Writer which serializes the Fluss rows to Arrow record batches.
38+
*
39+
* @param fieldWriters An array of writers which are responsible for the serialization of each
40+
* column of the rows
41+
* @param root Container that holds a set of vectors for the rows
42+
*/
43+
public ArrowWriter(ArrowFieldWriter<InternalRow>[] fieldWriters, VectorSchemaRoot root) {
44+
this.fieldWriters = fieldWriters;
45+
this.root = root;
46+
}
47+
48+
public static ArrowWriter create(VectorSchemaRoot root, RowType rowType) {
49+
ArrowFieldWriter<InternalRow>[] fieldWriters =
50+
new ArrowFieldWriter[root.getFieldVectors().size()];
51+
for (int i = 0; i < fieldWriters.length; i++) {
52+
FieldVector fieldVector = root.getVector(i);
53+
54+
fieldWriters[i] =
55+
LanceArrowUtils.createArrowFieldWriter(fieldVector, rowType.getTypeAt(i));
56+
}
57+
return new ArrowWriter(fieldWriters, root);
58+
}
59+
60+
/** Writes the specified row which is serialized into Arrow format. */
61+
public void writeRow(InternalRow row) {
62+
for (int i = 0; i < fieldWriters.length; i++) {
63+
fieldWriters[i].write(row, i, true);
64+
}
65+
recordsCount++;
66+
}
67+
68+
public void finish() {
69+
root.setRowCount(recordsCount);
70+
for (ArrowFieldWriter<InternalRow> fieldWriter : fieldWriters) {
71+
fieldWriter.finish();
72+
}
73+
}
74+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.lance.tiering;
19+
20+
import com.alibaba.fluss.record.LogRecord;
21+
import com.alibaba.fluss.types.RowType;
22+
23+
import org.apache.arrow.memory.BufferAllocator;
24+
import org.apache.arrow.vector.ipc.ArrowReader;
25+
import org.apache.arrow.vector.types.pojo.Schema;
26+
27+
import java.io.IOException;
28+
import java.util.concurrent.Semaphore;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.concurrent.atomic.AtomicLong;
31+
32+
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
33+
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
34+
35+
/** A custom arrow reader that supports writes Fluss internal rows while reading data in batches. */
36+
public class LanceArrowWriter extends ArrowReader {
37+
private final Schema schema;
38+
private final RowType rowType;
39+
private final int batchSize;
40+
41+
private volatile boolean finished;
42+
43+
private final AtomicLong totalBytesRead = new AtomicLong();
44+
private ArrowWriter arrowWriter = null;
45+
private final AtomicInteger count = new AtomicInteger(0);
46+
private final Semaphore writeToken;
47+
private final Semaphore loadToken;
48+
49+
public LanceArrowWriter(
50+
BufferAllocator allocator, Schema schema, int batchSize, RowType rowType) {
51+
super(allocator);
52+
checkNotNull(schema);
53+
checkArgument(batchSize > 0);
54+
this.schema = schema;
55+
this.rowType = rowType;
56+
this.batchSize = batchSize;
57+
this.writeToken = new Semaphore(0);
58+
this.loadToken = new Semaphore(0);
59+
}
60+
61+
void write(LogRecord row) {
62+
checkNotNull(row);
63+
try {
64+
// wait util prepareLoadNextBatch to release write token,
65+
writeToken.acquire();
66+
arrowWriter.writeRow(row.getRow());
67+
if (count.incrementAndGet() == batchSize) {
68+
// notify loadNextBatch to take the batch
69+
loadToken.release();
70+
}
71+
} catch (InterruptedException e) {
72+
throw new RuntimeException(e);
73+
}
74+
}
75+
76+
void setFinished() {
77+
loadToken.release();
78+
finished = true;
79+
}
80+
81+
@Override
82+
public void prepareLoadNextBatch() throws IOException {
83+
super.prepareLoadNextBatch();
84+
arrowWriter = ArrowWriter.create(this.getVectorSchemaRoot(), rowType);
85+
// release batch size token for write
86+
writeToken.release(batchSize);
87+
}
88+
89+
@Override
90+
public boolean loadNextBatch() throws IOException {
91+
prepareLoadNextBatch();
92+
try {
93+
if (finished && count.get() == 0) {
94+
return false;
95+
}
96+
// wait util batch if full or finished
97+
loadToken.acquire();
98+
arrowWriter.finish();
99+
if (!finished) {
100+
count.set(0);
101+
return true;
102+
} else {
103+
// true if it has some rows and return false if there is no record
104+
if (count.get() > 0) {
105+
count.set(0);
106+
return true;
107+
} else {
108+
return false;
109+
}
110+
}
111+
} catch (InterruptedException e) {
112+
throw new RuntimeException(e);
113+
}
114+
}
115+
116+
@Override
117+
public long bytesRead() {
118+
throw new UnsupportedOperationException();
119+
}
120+
121+
@Override
122+
protected synchronized void closeReadSource() throws IOException {
123+
// Implement if needed
124+
}
125+
126+
@Override
127+
protected Schema readSchema() {
128+
return this.schema;
129+
}
130+
}

0 commit comments

Comments
 (0)