Skip to content

Commit 1238b3e

Browse files
committed
rebase master
1 parent ce8dca7 commit 1238b3e

File tree

8 files changed

+71
-44
lines changed

8 files changed

+71
-44
lines changed

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,6 @@ public class LanceConfig implements Serializable {
3737
private static final String max_row_per_file = "max_row_per_file";
3838
private static final String max_rows_per_group = "max_rows_per_group";
3939
private static final String max_bytes_per_file = "max_bytes_per_file";
40-
private static final String ak = "access_key_id";
41-
private static final String sk = "secret_access_key";
42-
private static final String endpoint = "aws_endpoint";
43-
private static final String region = "aws_region";
44-
private static final String virtual_hosted_style = "virtual_hosted_style_request";
45-
private static final String allow_http = "allow_http";
4640
private static final String batch_size = "batch_size";
4741
private static final String warehouse = "warehouse";
4842

@@ -64,11 +58,17 @@ public LanceConfig(
6458
}
6559

6660
public static LanceConfig from(
67-
Map<String, String> properties, String databaseName, String tableName) {
68-
if (!properties.containsKey(warehouse)) {
61+
Map<String, String> clusterConf,
62+
Map<String, String> tableCustomProperties,
63+
String databaseName,
64+
String tableName) {
65+
if (!clusterConf.containsKey(warehouse)) {
6966
throw new IllegalArgumentException("Missing required option " + warehouse);
7067
}
71-
return new LanceConfig(databaseName, tableName, properties.get(warehouse), properties);
68+
Map<String, String> options = new HashMap<>();
69+
options.putAll(clusterConf);
70+
options.putAll(tableCustomProperties);
71+
return new LanceConfig(databaseName, tableName, clusterConf.get(warehouse), options);
7272
}
7373

7474
public static int getBatchSize(LanceConfig config) {
@@ -130,24 +130,8 @@ public static WriteParams genWriteParamsFromConfig(LanceConfig config) {
130130
return builder.build();
131131
}
132132

133-
private static Map<String, String> genStorageOptions(LanceConfig config) {
134-
Map<String, String> storageOptions = new HashMap<>();
135-
Map<String, String> maps = config.getOptions();
136-
if (maps.containsKey(ak) && maps.containsKey(sk) && maps.containsKey(endpoint)) {
137-
storageOptions.put(ak, maps.get(ak));
138-
storageOptions.put(sk, maps.get(sk));
139-
storageOptions.put(endpoint, maps.get(endpoint));
140-
}
141-
if (maps.containsKey(region)) {
142-
storageOptions.put(region, maps.get(region));
143-
}
144-
if (maps.containsKey(virtual_hosted_style)) {
145-
storageOptions.put(virtual_hosted_style, maps.get(virtual_hosted_style));
146-
}
147-
if (maps.containsKey(allow_http)) {
148-
storageOptions.put(allow_http, maps.get(allow_http));
149-
}
150-
return storageOptions;
133+
public static Map<String, String> genStorageOptions(LanceConfig config) {
134+
return config.getOptions();
151135
}
152136

153137
@Override

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
6767

6868
LanceConfig config =
6969
LanceConfig.from(
70-
options.toMap(), tablePath.getDatabaseName(), tablePath.getTableName());
70+
options.toMap(),
71+
tableDescriptor.getCustomProperties(),
72+
tablePath.getDatabaseName(),
73+
tablePath.getTableName());
7174
WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
7275

7376
List<Field> fields = new ArrayList<>();

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import java.io.IOException;
3636
import java.util.Arrays;
37+
import java.util.Collections;
3738
import java.util.LinkedHashMap;
3839
import java.util.List;
3940
import java.util.Map;
@@ -50,7 +51,10 @@ public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult, Lance
5051
public LanceLakeCommitter(Configuration options, TablePath tablePath) {
5152
this.config =
5253
LanceConfig.from(
53-
options.toMap(), tablePath.getDatabaseName(), tablePath.getTableName());
54+
options.toMap(),
55+
Collections.emptyMap(),
56+
tablePath.getDatabaseName(),
57+
tablePath.getTableName());
5458
}
5559

5660
@Override
@@ -104,13 +108,16 @@ public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSna
104108

105109
LinkedHashMap<Integer, Long> bucketEndOffset = new LinkedHashMap<>();
106110

107-
Map<String, String> options = config.getOptions();
108-
options.put("version", String.valueOf((latestLakeSnapshotIdOfLake.get() + 1)));
109111
ArrowReader reader =
110112
LanceDatasetAdapter.getArrowReader(
111-
LanceConfig.from(options, config.getDatabaseName(), config.getTableName()),
113+
LanceConfig.from(
114+
config.getOptions(),
115+
Collections.emptyMap(),
116+
config.getDatabaseName(),
117+
config.getTableName()),
112118
Arrays.asList(BUCKET_COLUMN_NAME, OFFSET_COLUMN_NAME),
113-
Arrays.asList());
119+
Collections.emptyList(),
120+
(int) (latestLakeSnapshotIdOfLake.get() + 1));
114121
VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
115122
while (reader.loadNextBatch()) {
116123
IntVector bucketVector = (IntVector) readerRoot.getVector(BUCKET_COLUMN_NAME);

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public LanceLakeWriter(Configuration options, WriterInitContext writerInitContex
5252
LanceConfig config =
5353
LanceConfig.from(
5454
options.toMap(),
55+
writerInitContext.customProperties(),
5556
writerInitContext.tablePath().getDatabaseName(),
5657
writerInitContext.tablePath().getTableName());
5758
int batchSize = LanceConfig.getBatchSize(config);

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
import org.apache.arrow.vector.ipc.ArrowReader;
3838
import org.apache.arrow.vector.types.pojo.Schema;
3939

40-
import java.util.Arrays;
40+
import javax.annotation.Nullable;
41+
42+
import java.util.Collections;
4143
import java.util.List;
4244
import java.util.Optional;
4345

@@ -92,14 +94,21 @@ public static Optional<Long> getVersion(LanceConfig config) {
9294
}
9395

9496
public static ArrowReader getArrowReader(LanceConfig config) {
95-
return getArrowReader(config, Arrays.asList(), Arrays.asList());
97+
return getArrowReader(config, Collections.emptyList(), Collections.emptyList(), null);
9698
}
9799

98100
public static ArrowReader getArrowReader(
99-
LanceConfig config, List<String> columns, List<ColumnOrdering> columnOrderings) {
100-
String uri = config.getDatasetUri();
101-
ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
102-
try (Dataset datasetRead = Dataset.open(allocator, uri, options)) {
101+
LanceConfig config,
102+
List<String> columns,
103+
List<ColumnOrdering> columnOrderings,
104+
@Nullable Integer version) {
105+
ReadOptions.Builder builder = new ReadOptions.Builder();
106+
builder.setStorageOptions(LanceConfig.genStorageOptions(config));
107+
if (version != null) {
108+
builder.setVersion(version);
109+
}
110+
try (Dataset datasetRead =
111+
Dataset.open(allocator, config.getDatasetUri(), builder.build())) {
103112
ScanOptions.Builder scanOptionBuilder = new ScanOptions.Builder();
104113
if (!columns.isEmpty()) {
105114
scanOptionBuilder.columns(columns);

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545

4646
import java.nio.file.Files;
4747
import java.util.Arrays;
48+
import java.util.HashMap;
49+
import java.util.Map;
4850

4951
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
5052
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -111,6 +113,8 @@ private static Configuration initConfig() {
111113

112114
@Test
113115
void testLogTable() throws Exception {
116+
Map<String, String> customProperties = new HashMap<>();
117+
customProperties.put("lance.batch_size", "256");
114118
// test bucket key log table
115119
TableDescriptor logTable =
116120
TableDescriptor.builder()
@@ -134,11 +138,13 @@ void testLogTable() throws Exception {
134138
.column("log_c16", DataTypes.TIMESTAMP())
135139
.build())
136140
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
141+
.customProperties(customProperties)
137142
.distributedBy(BUCKET_NUM, "log_c1", "log_c2")
138143
.build();
139144
TablePath logTablePath = TablePath.of(DATABASE, "log_table");
140145
admin.createTable(logTablePath, logTable, false).get();
141-
LanceConfig config = LanceConfig.from(lanceConf.toMap(), DATABASE, "log_table");
146+
LanceConfig config =
147+
LanceConfig.from(lanceConf.toMap(), customProperties, DATABASE, "log_table");
142148

143149
// check the gotten log table
144150
Field logC1 = new Field("log_c1", FieldType.nullable(new ArrowType.Int(4 * 8, true)), null);

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import java.util.ArrayList;
3737
import java.util.Arrays;
38+
import java.util.Collections;
3839
import java.util.Iterator;
3940
import java.util.List;
4041

@@ -91,7 +92,10 @@ private void checkDataInLanceAppendOnlyTable(
9192
throws Exception {
9293
LanceConfig config =
9394
LanceConfig.from(
94-
lanceConf.toMap(), tablePath.getDatabaseName(), tablePath.getTableName());
95+
lanceConf.toMap(),
96+
Collections.emptyMap(),
97+
tablePath.getDatabaseName(),
98+
tablePath.getTableName());
9599
ArrowReader reader = LanceDatasetAdapter.getArrowReader(config);
96100
VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
97101
// while (reader.loadNextBatch()) {

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,12 @@ private static Stream<Arguments> tieringWriteArgs() {
8686
void testTieringWriteTable(boolean isPartitioned) throws Exception {
8787
int bucketNum = 3;
8888
TablePath tablePath = TablePath.of("lance", "logTable");
89+
Map<String, String> customProperties = new HashMap<>();
90+
customProperties.put("lance.batch_size", "256");
8991
LanceConfig config =
9092
LanceConfig.from(
9193
configuration.toMap(),
94+
customProperties,
9295
tablePath.getDatabaseName(),
9396
tablePath.getTableName());
9497
Schema schema = createTable(tablePath, isPartitioned, null, config);
@@ -122,7 +125,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
122125
for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
123126
String partition = entry.getValue();
124127
try (LakeWriter<LanceWriteResult> lakeWriter =
125-
createLakeWriter(tablePath, bucket, partition, schema)) {
128+
createLakeWriter(tablePath, bucket, partition, schema, customProperties)) {
126129
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
127130
Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords =
128131
genLogTableRecords(partition, bucket, 10);
@@ -154,7 +157,8 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
154157
committableSerializer.getVersion(), serialized);
155158
long snapshot =
156159
lakeCommitter.commit(
157-
lanceCommittable, toBucketOffsetsProperty(tableBucketOffsets));
160+
lanceCommittable,
161+
toBucketOffsetsProperty(tableBucketOffsets, partitionIdAndName, null));
158162
assertThat(snapshot).isEqualTo(1);
159163
}
160164

@@ -231,7 +235,11 @@ private LakeCommitter<LanceWriteResult, LanceCommittable> createLakeCommitter(
231235
}
232236

233237
private LakeWriter<LanceWriteResult> createLakeWriter(
234-
TablePath tablePath, int bucket, @Nullable String partition, Schema schema)
238+
TablePath tablePath,
239+
int bucket,
240+
@Nullable String partition,
241+
Schema schema,
242+
Map<String, String> customProperties)
235243
throws IOException {
236244
return lanceLakeTieringFactory.createLakeWriter(
237245
new WriterInitContext() {
@@ -256,6 +264,11 @@ public String partition() {
256264
public com.alibaba.fluss.metadata.Schema schema() {
257265
return schema;
258266
}
267+
268+
@Override
269+
public Map<String, String> customProperties() {
270+
return customProperties;
271+
}
259272
});
260273
}
261274

0 commit comments

Comments
 (0)