Skip to content

Commit be872bb

Browse files
committed
address comments
1 parent 33fc2c0 commit be872bb

File tree

9 files changed

+28
-170
lines changed

9 files changed

+28
-170
lines changed

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ public class LanceConfig implements Serializable {
3232

3333
private static final String block_size = "block_size";
3434
private static final String version = "version";
35-
private static final String index_cache_size = "index_cache_size";
36-
private static final String metadata_cache_size = "metadata_cache_size";
3735
private static final String max_row_per_file = "max_row_per_file";
3836
private static final String max_rows_per_group = "max_rows_per_group";
3937
private static final String max_bytes_per_file = "max_bytes_per_file";
@@ -83,14 +81,6 @@ public Map<String, String> getOptions() {
8381
return options;
8482
}
8583

86-
public String getDatabaseName() {
87-
return databaseName;
88-
}
89-
90-
public String getTableName() {
91-
return tableName;
92-
}
93-
9484
public String getDatasetUri() {
9585
return datasetUri;
9686
}
@@ -104,12 +94,6 @@ public static ReadOptions genReadOptionFromConfig(LanceConfig config) {
10494
if (maps.containsKey(version)) {
10595
builder.setVersion(Integer.parseInt(maps.get(version)));
10696
}
107-
if (maps.containsKey(index_cache_size)) {
108-
builder.setIndexCacheSize(Integer.parseInt(maps.get(index_cache_size)));
109-
}
110-
if (maps.containsKey(metadata_cache_size)) {
111-
builder.setMetadataCacheSize(Integer.parseInt(maps.get(metadata_cache_size)));
112-
}
11397
builder.setStorageOptions(genStorageOptions(config));
11498
return builder.build();
11599
}

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ public int getVersion() {
3939

4040
@Override
4141
public byte[] serialize(LanceCommittable lanceCommittable) throws IOException {
42-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
43-
ObjectOutputStream oos = new ObjectOutputStream(baos);
44-
oos.writeObject(lanceCommittable.committable());
45-
oos.close();
46-
return baos.toByteArray();
42+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
43+
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
44+
oos.writeObject(lanceCommittable.committable());
45+
return baos.toByteArray();
46+
}
4747
}
4848

4949
@Override
@@ -56,9 +56,9 @@ public LanceCommittable deserialize(int version, byte[] serialized) throws IOExc
5656
+ version
5757
+ ".");
5858
}
59-
ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
60-
ObjectInputStream ois = new ObjectInputStream(bais);
61-
try {
59+
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
60+
ObjectInputStream ois = new ObjectInputStream(bais)) {
61+
//noinspection unchecked
6262
return new LanceCommittable((List<FragmentMetadata>) ois.readObject());
6363
} catch (ClassNotFoundException e) {
6464
throw new IOException("Couldn't deserialize LanceCommittable", e);

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
/** Implementation of {@link LakeCommitter} for Lance. */
5151
public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult, LanceCommittable> {
5252
private final LanceConfig config;
53+
private static String commitUser = "commit-user";
5354
private final RootAllocator allocator = new RootAllocator();
5455
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
5556

@@ -77,12 +78,15 @@ public LanceCommittable toCommittable(List<LanceWriteResult> lanceWriteResults)
7778
public long commit(LanceCommittable committable, Map<String, String> snapshotProperties)
7879
throws IOException {
7980
Map<String, String> properties = new HashMap<>(snapshotProperties);
80-
properties.put("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
81+
properties.put(commitUser, FLUSS_LAKE_TIERING_COMMIT_USER);
8182
return LanceDatasetAdapter.commitAppend(config, committable.committable(), properties);
8283
}
8384

8485
@Override
85-
public void abort(LanceCommittable committable) throws IOException {}
86+
public void abort(LanceCommittable committable) throws IOException {
87+
// TODO lance does not have the API to proactively delete the written files yet, see
88+
// https://github.com/lancedb/lance/issues/4508
89+
}
8690

8791
@SuppressWarnings("checkstyle:LocalVariableName")
8892
@Nullable
@@ -140,7 +144,7 @@ private Transaction getCommittedLatestSnapshotOfLake(String commitUser) {
140144
Transaction transaction = datasetVersion.readTransaction().orElse(null);
141145
if (transaction != null
142146
&& commitUser.equals(
143-
transaction.transactionProperties().get("commit-user"))) {
147+
transaction.transactionProperties().get(commitUser))) {
144148
if (latestFlussSnapshot == null
145149
|| transaction.readVersion() > latestFlussSnapshot.readVersion()) {
146150
latestFlussSnapshot = transaction;
@@ -153,5 +157,7 @@ private Transaction getCommittedLatestSnapshotOfLake(String commitUser) {
153157
}
154158

155159
@Override
156-
public void close() throws Exception {}
160+
public void close() throws Exception {
161+
allocator.close();
162+
}
157163
}

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public LanceLakeWriter(Configuration options, WriterInitContext writerInitContex
5050
writerInitContext.tablePath().getTableName());
5151
int batchSize = LanceConfig.getBatchSize(config);
5252
Optional<Schema> schema = LanceDatasetAdapter.getSchema(config);
53-
if (!schema.isPresent()) {
53+
if (schema.isEmpty()) {
5454
throw new IOException("Fail to get dataset " + config.getDatasetUri() + " in Lance.");
5555
}
5656

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,17 @@ public int getVersion() {
3636

3737
@Override
3838
public byte[] serialize(LanceWriteResult lanceWriteResult) throws IOException {
39-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
40-
ObjectOutputStream oos = new ObjectOutputStream(baos);
41-
oos.writeObject(lanceWriteResult);
42-
oos.close();
43-
return baos.toByteArray();
39+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
40+
ObjectOutputStream oos = new ObjectOutputStream(baos); ) {
41+
oos.writeObject(lanceWriteResult);
42+
return baos.toByteArray();
43+
}
4444
}
4545

4646
@Override
4747
public LanceWriteResult deserialize(int version, byte[] serialized) throws IOException {
48-
ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
49-
ObjectInputStream ois = new ObjectInputStream(bais);
50-
try {
48+
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
49+
ObjectInputStream ois = new ObjectInputStream(bais)) {
5150
return (LanceWriteResult) ois.readObject();
5251
} catch (ClassNotFoundException e) {
5352
throw new IOException("Couldn't deserialize LanceWriteResult", e);

fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.lancedb.lance.Dataset;
2525
import com.lancedb.lance.Fragment;
2626
import com.lancedb.lance.FragmentMetadata;
27-
import com.lancedb.lance.FragmentOperation;
2827
import com.lancedb.lance.ReadOptions;
2928
import com.lancedb.lance.Transaction;
3029
import com.lancedb.lance.WriteParams;
@@ -59,25 +58,6 @@ public static Optional<Schema> getSchema(LanceConfig config) {
5958
}
6059
}
6160

62-
public static long appendFragments(LanceConfig config, List<FragmentMetadata> fragments) {
63-
FragmentOperation.Append appendOp = new FragmentOperation.Append(fragments);
64-
String uri = config.getDatasetUri();
65-
ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
66-
try (Dataset datasetRead = Dataset.open(allocator, uri, options)) {
67-
Dataset datasetWrite =
68-
Dataset.commit(
69-
allocator,
70-
config.getDatasetUri(),
71-
appendOp,
72-
java.util.Optional.of(datasetRead.version()),
73-
options.getStorageOptions());
74-
long version = datasetWrite.version();
75-
datasetWrite.close();
76-
// Dataset.create returns version 1
77-
return version - 1;
78-
}
79-
}
80-
8161
public static long commitAppend(
8262
LanceConfig config, List<FragmentMetadata> fragments, Map<String, String> properties) {
8363
String uri = config.getDatasetUri();

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriterTest.java

Lines changed: 0 additions & 110 deletions
This file was deleted.

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import static org.assertj.core.api.Assertions.assertThat;
5151

5252
/** IT case for tiering tables to lance. */
53-
public class LanceTieringITCase extends FlinkLanceTieringTestBase {
53+
class LanceTieringITCase extends FlinkLanceTieringTestBase {
5454
protected static final String DEFAULT_DB = "fluss";
5555
private static StreamExecutionEnvironment execEnv;
5656
private static Configuration lanceConf;
@@ -124,7 +124,6 @@ private void checkSnapshotPropertyInLance(
124124

125125
private void checkDataInLanceAppendOnlyTable(LanceConfig config, List<InternalRow> expectedRows)
126126
throws Exception {
127-
128127
try (Dataset dataset =
129128
Dataset.open(
130129
allocator,

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
import static org.assertj.core.api.Assertions.assertThat;
6666

6767
/** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */
68-
public class LanceTieringTest {
68+
class LanceTieringTest {
6969
private @TempDir File tempWarehouseDir;
7070
private LanceLakeTieringFactory lanceLakeTieringFactory;
7171
private Configuration configuration;

0 commit comments

Comments
 (0)