Skip to content

Commit 9b7e2b1

Browse files
committed
[lake] Paimon lake table support alter table properties
1 parent 804bdf3 commit 9b7e2b1

File tree

11 files changed

+336
-1
lines changed

11 files changed

+336
-1
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.exception;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
/**
23+
* Exception for lakeCatalog trying to alter a table that not exists.
24+
*
25+
* @since 0.8
26+
*/
27+
@PublicEvolving
28+
public class LakeTableNotExistException extends ApiException {
29+
private static final long serialVersionUID = 1L;
30+
31+
public LakeTableNotExistException(String message) {
32+
this(message, null);
33+
}
34+
35+
public LakeTableNotExistException(String message, Throwable cause) {
36+
super(message, cause);
37+
}
38+
}

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.exception.TableAlreadyExistException;
22+
import org.apache.fluss.exception.TableNotExistException;
23+
import org.apache.fluss.metadata.TableChange;
2224
import org.apache.fluss.metadata.TableDescriptor;
2325
import org.apache.fluss.metadata.TablePath;
2426

27+
import java.util.List;
28+
2529
/**
2630
* A catalog interface to modify metadata in external datalake.
2731
*
@@ -40,6 +44,16 @@ public interface LakeCatalog extends AutoCloseable {
4044
void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
4145
throws TableAlreadyExistException;
4246

47+
/**
48+
* Alter a table in lake.
49+
*
50+
* @param tablePath path of the table to be altered
51+
* @param tableChanges The changes to be applied to the table
52+
* @throws TableNotExistException if the table not exists
53+
*/
54+
void alterTable(TablePath tablePath, List<TableChange> tableChanges)
55+
throws TableNotExistException;
56+
4357
@Override
4458
default void close() throws Exception {
4559
// default do nothing

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919

2020
import org.apache.fluss.config.Configuration;
2121
import org.apache.fluss.exception.TableAlreadyExistException;
22+
import org.apache.fluss.exception.TableNotExistException;
2223
import org.apache.fluss.lake.source.LakeSource;
2324
import org.apache.fluss.lake.writer.LakeTieringFactory;
25+
import org.apache.fluss.metadata.TableChange;
2426
import org.apache.fluss.metadata.TableDescriptor;
2527
import org.apache.fluss.metadata.TablePath;
2628
import org.apache.fluss.utils.TemporaryClassLoaderContext;
2729
import org.apache.fluss.utils.WrappingProxy;
2830

31+
import java.util.List;
32+
2933
/**
3034
* A wrapper around {@link LakeStoragePlugin} that ensures the plugin classloader is used for all
3135
* {@link LakeCatalog} operations.
@@ -78,6 +82,14 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
7882
}
7983
}
8084

85+
@Override
86+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
87+
throws TableNotExistException {
88+
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
89+
inner.alterTable(tablePath, tableChanges);
90+
}
91+
}
92+
8193
@Override
8294
public void close() throws Exception {
8395
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {

fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import org.apache.fluss.config.Configuration;
2121
import org.apache.fluss.exception.TableAlreadyExistException;
22+
import org.apache.fluss.exception.TableNotExistException;
2223
import org.apache.fluss.lake.source.LakeSource;
2324
import org.apache.fluss.lake.writer.LakeTieringFactory;
25+
import org.apache.fluss.metadata.TableChange;
2426
import org.apache.fluss.metadata.TableDescriptor;
2527
import org.apache.fluss.metadata.TablePath;
2628
import org.apache.fluss.plugin.PluginManager;
@@ -30,6 +32,7 @@
3032
import java.util.Collections;
3133
import java.util.HashMap;
3234
import java.util.Iterator;
35+
import java.util.List;
3336
import java.util.Map;
3437

3538
import static org.assertj.core.api.Assertions.assertThat;
@@ -145,5 +148,9 @@ private static class TestPaimonLakeCatalog implements LakeCatalog {
145148
@Override
146149
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
147150
throws TableAlreadyExistException {}
151+
152+
@Override
153+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
154+
throws TableNotExistException {}
148155
}
149156
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.exception.TableAlreadyExistException;
23+
import org.apache.fluss.exception.TableNotExistException;
2324
import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
2425
import org.apache.fluss.lake.lakestorage.LakeCatalog;
26+
import org.apache.fluss.metadata.TableChange;
2527
import org.apache.fluss.metadata.TableDescriptor;
2628
import org.apache.fluss.metadata.TablePath;
2729
import org.apache.fluss.utils.IOUtils;
@@ -112,6 +114,13 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
112114
}
113115
}
114116

117+
@Override
118+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
119+
throws TableNotExistException {
120+
throw new UnsupportedOperationException(
121+
"Alter table is not supported for Iceberg at the moment");
122+
}
123+
115124
private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
116125
return TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName());
117126
}

fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
import org.apache.fluss.config.Configuration;
2121
import org.apache.fluss.exception.InvalidTableException;
22+
import org.apache.fluss.exception.TableNotExistException;
2223
import org.apache.fluss.lake.lakestorage.LakeCatalog;
2324
import org.apache.fluss.lake.lance.utils.LanceArrowUtils;
2425
import org.apache.fluss.lake.lance.utils.LanceDatasetAdapter;
26+
import org.apache.fluss.metadata.TableChange;
2527
import org.apache.fluss.metadata.TableDescriptor;
2628
import org.apache.fluss.metadata.TablePath;
2729

@@ -68,6 +70,13 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
6870
}
6971
}
7072

73+
@Override
74+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
75+
throws TableNotExistException {
76+
throw new UnsupportedOperationException(
77+
"Alter table is not supported for Lance at the moment");
78+
}
79+
7180
@Override
7281
public void close() throws Exception {
7382
LakeCatalog.super.close();

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.fluss.lake.paimon;
1919

20+
import org.apache.fluss.annotation.VisibleForTesting;
2021
import org.apache.fluss.config.Configuration;
2122
import org.apache.fluss.exception.InvalidTableException;
2223
import org.apache.fluss.exception.TableAlreadyExistException;
24+
import org.apache.fluss.exception.TableNotExistException;
2325
import org.apache.fluss.lake.lakestorage.LakeCatalog;
26+
import org.apache.fluss.metadata.TableChange;
2427
import org.apache.fluss.metadata.TableDescriptor;
2528
import org.apache.fluss.metadata.TablePath;
2629
import org.apache.fluss.utils.IOUtils;
@@ -32,13 +35,15 @@
3235
import org.apache.paimon.catalog.Identifier;
3336
import org.apache.paimon.options.Options;
3437
import org.apache.paimon.schema.Schema;
38+
import org.apache.paimon.schema.SchemaChange;
3539
import org.apache.paimon.types.DataType;
3640
import org.apache.paimon.types.DataTypes;
3741

3842
import java.util.LinkedHashMap;
3943
import java.util.List;
4044
import java.util.Map;
4145

46+
import static org.apache.fluss.lake.paimon.utils.FlussToPaimonTableChangeConverter.convert;
4247
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
4348
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
4449
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -64,14 +69,19 @@ public class PaimonLakeCatalog implements LakeCatalog {
6469
// for fluss config
6570
private static final String FLUSS_CONF_PREFIX = "fluss.";
6671
// for paimon config
67-
private static final String PAIMON_CONF_PREFIX = "paimon.";
72+
public static final String PAIMON_CONF_PREFIX = "paimon.";
6873

6974
public PaimonLakeCatalog(Configuration configuration) {
7075
this.paimonCatalog =
7176
CatalogFactory.createCatalog(
7277
CatalogContext.create(Options.fromMap(configuration.toMap())));
7378
}
7479

80+
@VisibleForTesting
81+
protected Catalog getPaimonCatalog() {
82+
return paimonCatalog;
83+
}
84+
7585
@Override
7686
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
7787
throws TableAlreadyExistException {
@@ -97,6 +107,20 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
97107
}
98108
}
99109

110+
@Override
111+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
112+
throws TableNotExistException {
113+
try {
114+
Identifier paimonPath = toPaimonIdentifier(tablePath);
115+
List<SchemaChange> paimonSchemaChanges =
116+
convert(tableChanges, this::getFlussPropertyKeyToPaimon);
117+
alterTable(paimonPath, paimonSchemaChanges);
118+
} catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) {
119+
// shouldn't happen for now
120+
throw new RuntimeException(e);
121+
}
122+
}
123+
100124
private void createTable(Identifier tablePath, Schema schema)
101125
throws Catalog.DatabaseNotExistException {
102126
try {
@@ -116,6 +140,15 @@ private void createDatabase(String databaseName) {
116140
}
117141
}
118142

143+
private void alterTable(Identifier tablePath, List<SchemaChange> tableChanges)
144+
throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException {
145+
try {
146+
paimonCatalog.alterTable(tablePath, tableChanges, false);
147+
} catch (Catalog.TableNotExistException e) {
148+
throw new TableNotExistException("Table " + tablePath + " not exists.");
149+
}
150+
}
151+
119152
private Identifier toPaimonIdentifier(TablePath tablePath) {
120153
return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
121154
}
@@ -190,6 +223,14 @@ private void setFlussPropertyToPaimon(String key, String value, Options options)
190223
}
191224
}
192225

226+
private String getFlussPropertyKeyToPaimon(String key) {
227+
if (key.startsWith(PAIMON_CONF_PREFIX)) {
228+
return key.substring(PAIMON_CONF_PREFIX.length());
229+
} else {
230+
return FLUSS_CONF_PREFIX + key;
231+
}
232+
}
233+
193234
@Override
194235
public void close() {
195236
IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.paimon.utils;
19+
20+
import org.apache.fluss.exception.InvalidAlterTableException;
21+
import org.apache.fluss.metadata.TableChange;
22+
23+
import org.apache.paimon.schema.SchemaChange;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.function.Function;
28+
29+
import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.PAIMON_CONF_PREFIX;
30+
31+
/** Converter for {@link TableChange} to {@link SchemaChange}. */
32+
public class FlussToPaimonTableChangeConverter {
33+
34+
public static List<SchemaChange> convert(
35+
List<TableChange> tableChanges, Function<String, String> keyFun) {
36+
List<SchemaChange> schemaChanges = new ArrayList<>(tableChanges.size());
37+
38+
for (TableChange tableChange : tableChanges) {
39+
if (tableChange instanceof TableChange.SetOption) {
40+
TableChange.SetOption setOption = (TableChange.SetOption) tableChange;
41+
String key = setOption.getKey();
42+
// Here we only prohibit direct modification of Paimon properties;
43+
// the modification rules for Fluss properties are determined by the upstream
44+
// interface.
45+
if (key.startsWith(PAIMON_CONF_PREFIX)) {
46+
throw new InvalidAlterTableException(
47+
"Alter paimon table property is not supported.");
48+
}
49+
schemaChanges.add(
50+
SchemaChange.setOption(
51+
keyFun.apply(setOption.getKey()), setOption.getValue()));
52+
} else if (tableChange instanceof TableChange.ResetOption) {
53+
TableChange.ResetOption resetOption = (TableChange.ResetOption) tableChange;
54+
String key = resetOption.getKey();
55+
// Here we only prohibit direct modification of Paimon properties;
56+
// the modification rules for Fluss properties are determined by the upstream
57+
// interface.
58+
if (key.startsWith(PAIMON_CONF_PREFIX)) {
59+
throw new InvalidAlterTableException(
60+
"Alter paimon table property is not supported.");
61+
}
62+
schemaChanges.add(SchemaChange.removeOption(keyFun.apply(resetOption.getKey())));
63+
} else {
64+
throw new UnsupportedOperationException(
65+
"Unsupported table change: " + tableChange.getClass());
66+
}
67+
}
68+
69+
return schemaChanges;
70+
}
71+
}

0 commit comments

Comments
 (0)