Skip to content

Commit ee46508

Browse files
committed
[lake] Paimon lake table support alter table properties
1 parent 4345663 commit ee46508

File tree

14 files changed

+433
-42
lines changed

14 files changed

+433
-42
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.exception.TableAlreadyExistException;
22+
import org.apache.fluss.exception.TableNotExistException;
23+
import org.apache.fluss.metadata.TableChange;
2224
import org.apache.fluss.metadata.TableDescriptor;
2325
import org.apache.fluss.metadata.TablePath;
2426

27+
import java.util.List;
28+
2529
/**
2630
* A catalog interface to modify metadata in external datalake.
2731
*
@@ -40,6 +44,16 @@ public interface LakeCatalog extends AutoCloseable {
4044
void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
4145
throws TableAlreadyExistException;
4246

47+
/**
48+
* Alter a table in lake.
49+
*
50+
* @param tablePath path of the table to be altered
51+
* @param tableChanges The changes to be applied to the table
52+
* @throws TableNotExistException if the table not exists
53+
*/
54+
void alterTable(TablePath tablePath, List<TableChange> tableChanges)
55+
throws TableNotExistException;
56+
4357
@Override
4458
default void close() throws Exception {
4559
// default do nothing

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919

2020
import org.apache.fluss.config.Configuration;
2121
import org.apache.fluss.exception.TableAlreadyExistException;
22+
import org.apache.fluss.exception.TableNotExistException;
2223
import org.apache.fluss.lake.source.LakeSource;
2324
import org.apache.fluss.lake.writer.LakeTieringFactory;
25+
import org.apache.fluss.metadata.TableChange;
2426
import org.apache.fluss.metadata.TableDescriptor;
2527
import org.apache.fluss.metadata.TablePath;
2628
import org.apache.fluss.utils.TemporaryClassLoaderContext;
2729
import org.apache.fluss.utils.WrappingProxy;
2830

31+
import java.util.List;
32+
2933
/**
3034
* A wrapper around {@link LakeStoragePlugin} that ensures the plugin classloader is used for all
3135
* {@link LakeCatalog} operations.
@@ -78,6 +82,14 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
7882
}
7983
}
8084

85+
@Override
86+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
87+
throws TableNotExistException {
88+
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
89+
inner.alterTable(tablePath, tableChanges);
90+
}
91+
}
92+
8193
@Override
8294
public void close() throws Exception {
8395
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {

fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,15 @@ public TableDescriptor withProperties(Map<String, String> newProperties) {
236236
schema, comment, partitionKeys, tableDistribution, newProperties, customProperties);
237237
}
238238

239+
/**
240+
* Returns a new TableDescriptor instance that is a copy of this TableDescriptor with a new
241+
* custom properties.
242+
*/
243+
public TableDescriptor withCustomProperties(Map<String, String> newCustomProperties) {
244+
return new TableDescriptor(
245+
schema, comment, partitionKeys, tableDistribution, properties, newCustomProperties);
246+
}
247+
239248
/**
240249
* Returns a new TableDescriptor instance that is a copy of this TableDescriptor with a new
241250
* properties and new customProperties.

fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import org.apache.fluss.config.Configuration;
2121
import org.apache.fluss.exception.TableAlreadyExistException;
22+
import org.apache.fluss.exception.TableNotExistException;
2223
import org.apache.fluss.lake.source.LakeSource;
2324
import org.apache.fluss.lake.writer.LakeTieringFactory;
25+
import org.apache.fluss.metadata.TableChange;
2426
import org.apache.fluss.metadata.TableDescriptor;
2527
import org.apache.fluss.metadata.TablePath;
2628
import org.apache.fluss.plugin.PluginManager;
@@ -30,6 +32,7 @@
3032
import java.util.Collections;
3133
import java.util.HashMap;
3234
import java.util.Iterator;
35+
import java.util.List;
3336
import java.util.Map;
3437

3538
import static org.assertj.core.api.Assertions.assertThat;
@@ -145,5 +148,9 @@ private static class TestPaimonLakeCatalog implements LakeCatalog {
145148
@Override
146149
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
147150
throws TableAlreadyExistException {}
151+
152+
@Override
153+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
154+
throws TableNotExistException {}
148155
}
149156
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.exception.TableAlreadyExistException;
23+
import org.apache.fluss.exception.TableNotExistException;
2324
import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
2425
import org.apache.fluss.lake.lakestorage.LakeCatalog;
26+
import org.apache.fluss.metadata.TableChange;
2527
import org.apache.fluss.metadata.TableDescriptor;
2628
import org.apache.fluss.metadata.TablePath;
2729
import org.apache.fluss.utils.IOUtils;
@@ -112,6 +114,13 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
112114
}
113115
}
114116

117+
@Override
118+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
119+
throws TableNotExistException {
120+
throw new UnsupportedOperationException(
121+
"Alter table is not supported for Iceberg at the moment");
122+
}
123+
115124
private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
116125
return TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName());
117126
}

fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
import org.apache.fluss.config.Configuration;
2121
import org.apache.fluss.exception.InvalidTableException;
22+
import org.apache.fluss.exception.TableNotExistException;
2223
import org.apache.fluss.lake.lakestorage.LakeCatalog;
2324
import org.apache.fluss.lake.lance.utils.LanceArrowUtils;
2425
import org.apache.fluss.lake.lance.utils.LanceDatasetAdapter;
26+
import org.apache.fluss.metadata.TableChange;
2527
import org.apache.fluss.metadata.TableDescriptor;
2628
import org.apache.fluss.metadata.TablePath;
2729

@@ -68,6 +70,13 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
6870
}
6971
}
7072

73+
@Override
74+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
75+
throws TableNotExistException {
76+
throw new UnsupportedOperationException(
77+
"Alter table is not supported for Lance at the moment");
78+
}
79+
7180
@Override
7281
public void close() throws Exception {
7382
LakeCatalog.super.close();

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.fluss.lake.paimon;
1919

20+
import org.apache.fluss.annotation.VisibleForTesting;
2021
import org.apache.fluss.config.Configuration;
2122
import org.apache.fluss.exception.InvalidTableException;
2223
import org.apache.fluss.exception.TableAlreadyExistException;
24+
import org.apache.fluss.exception.TableNotExistException;
2325
import org.apache.fluss.lake.lakestorage.LakeCatalog;
26+
import org.apache.fluss.metadata.TableChange;
2427
import org.apache.fluss.metadata.TableDescriptor;
2528
import org.apache.fluss.metadata.TablePath;
2629
import org.apache.fluss.utils.IOUtils;
@@ -32,13 +35,15 @@
3235
import org.apache.paimon.catalog.Identifier;
3336
import org.apache.paimon.options.Options;
3437
import org.apache.paimon.schema.Schema;
38+
import org.apache.paimon.schema.SchemaChange;
3539
import org.apache.paimon.types.DataType;
3640
import org.apache.paimon.types.DataTypes;
3741

3842
import java.util.LinkedHashMap;
3943
import java.util.List;
4044
import java.util.Map;
4145

46+
import static org.apache.fluss.lake.paimon.utils.FlussToPaimonTableChangeConverter.convert;
4247
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
4348
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
4449
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -72,6 +77,11 @@ public PaimonLakeCatalog(Configuration configuration) {
7277
CatalogContext.create(Options.fromMap(configuration.toMap())));
7378
}
7479

80+
@VisibleForTesting
81+
protected Catalog getPaimonCatalog() {
82+
return paimonCatalog;
83+
}
84+
7585
@Override
7686
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
7787
throws TableAlreadyExistException {
@@ -97,6 +107,20 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
97107
}
98108
}
99109

110+
@Override
111+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
112+
throws TableNotExistException {
113+
try {
114+
Identifier paimonPath = toPaimonIdentifier(tablePath);
115+
List<SchemaChange> paimonSchemaChanges =
116+
convert(tableChanges, this::getFlussPropertyKeyToPaimon);
117+
alterTable(paimonPath, paimonSchemaChanges);
118+
} catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) {
119+
// shouldn't happen for now
120+
throw new RuntimeException(e);
121+
}
122+
}
123+
100124
private void createTable(Identifier tablePath, Schema schema)
101125
throws Catalog.DatabaseNotExistException {
102126
try {
@@ -116,6 +140,15 @@ private void createDatabase(String databaseName) {
116140
}
117141
}
118142

143+
private void alterTable(Identifier tablePath, List<SchemaChange> tableChanges)
144+
throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException {
145+
try {
146+
paimonCatalog.alterTable(tablePath, tableChanges, false);
147+
} catch (Catalog.TableNotExistException e) {
148+
throw new TableNotExistException("Table " + tablePath + " not exists.");
149+
}
150+
}
151+
119152
private Identifier toPaimonIdentifier(TablePath tablePath) {
120153
return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
121154
}
@@ -190,6 +223,14 @@ private void setFlussPropertyToPaimon(String key, String value, Options options)
190223
}
191224
}
192225

226+
private String getFlussPropertyKeyToPaimon(String key) {
227+
if (key.startsWith(PAIMON_CONF_PREFIX)) {
228+
return key.substring(PAIMON_CONF_PREFIX.length());
229+
} else {
230+
return FLUSS_CONF_PREFIX + key;
231+
}
232+
}
233+
193234
@Override
194235
public void close() {
195236
IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.paimon.utils;
19+
20+
import org.apache.fluss.metadata.TableChange;
21+
22+
import org.apache.paimon.schema.SchemaChange;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.function.Function;
27+
28+
/** Converter for {@link TableChange} to {@link SchemaChange}. */
29+
public class FlussToPaimonTableChangeConverter {
30+
31+
public static List<SchemaChange> convert(
32+
List<TableChange> tableChanges, Function<String, String> keyFun) {
33+
List<SchemaChange> schemaChanges = new ArrayList<>(tableChanges.size());
34+
35+
for (TableChange tableChange : tableChanges) {
36+
if (tableChange instanceof TableChange.SetOption) {
37+
TableChange.SetOption setOption = (TableChange.SetOption) tableChange;
38+
schemaChanges.add(
39+
SchemaChange.setOption(
40+
keyFun.apply(setOption.getKey()), setOption.getValue()));
41+
} else if (tableChange instanceof TableChange.ResetOption) {
42+
TableChange.ResetOption resetOption = (TableChange.ResetOption) tableChange;
43+
schemaChanges.add(SchemaChange.removeOption(keyFun.apply(resetOption.getKey())));
44+
} else {
45+
throw new UnsupportedOperationException(
46+
"Unsupported table change: " + tableChange.getClass());
47+
}
48+
}
49+
50+
return schemaChanges;
51+
}
52+
}

0 commit comments

Comments
 (0)