diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java index 2dce054716..4048ca007b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java @@ -23,6 +23,7 @@ import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.security.acl.FlussPrincipal; import java.util.List; @@ -39,9 +40,10 @@ public interface LakeCatalog extends AutoCloseable { * * @param tablePath path of the table to be created * @param tableDescriptor The descriptor of the table to be created + * @param context contextual information needed for create table * @throws TableAlreadyExistException if the table already exists */ - void createTable(TablePath tablePath, TableDescriptor tableDescriptor) + void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context) throws TableAlreadyExistException; /** @@ -49,13 +51,31 @@ void createTable(TablePath tablePath, TableDescriptor tableDescriptor) * * @param tablePath path of the table to be altered * @param tableChanges The changes to be applied to the table + * @param context contextual information needed for alter table * @throws TableNotExistException if the table not exists */ - void alterTable(TablePath tablePath, List tableChanges) + void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException; @Override default void close() throws Exception { // default do nothing } + + /** + * Contextual information for lake catalog methods that modify metadata in an external data + * lake. It can be used to: + * + * + * + * @since 0.9 + */ + @PublicEvolving + interface Context { + + /** Get the fluss principal currently accessing the catalog. */ + FlussPrincipal getFlussPrincipal(); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java index 9c75d36094..37cd17ad7e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java @@ -75,18 +75,19 @@ private ClassLoaderFixingLakeCatalog(final LakeCatalog inner, final ClassLoader } @Override - public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) + public void createTable( + TablePath tablePath, TableDescriptor tableDescriptor, Context context) throws TableAlreadyExistException { try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { - inner.createTable(tablePath, tableDescriptor); + inner.createTable(tablePath, tableDescriptor, context); } } @Override - public void alterTable(TablePath tablePath, List tableChanges) + public void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException { try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { - inner.alterTable(tablePath, tableChanges); + inner.alterTable(tablePath, tableChanges, context); } } diff --git a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java index 5812cc3cad..178ec37af2 100644 --- a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java @@ -146,11 +146,12 @@ public LakeSource createLakeSource(TablePath tablePath) { private static class TestPaimonLakeCatalog implements LakeCatalog { @Override - public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) + public void createTable( + TablePath tablePath, TableDescriptor tableDescriptor, Context context) throws TableAlreadyExistException {} @Override - public void alterTable(TablePath tablePath, List tableChanges) + public void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException {} } } diff --git a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java new file mode 100644 index 0000000000..b57ff94cfa --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lakestorage; + +import org.apache.fluss.security.acl.FlussPrincipal; + +/** A testing implementation of {@link LakeCatalog.Context}. */ +public class TestingLakeCatalogContext implements LakeCatalog.Context { + + @Override + public FlussPrincipal getFlussPrincipal() { + return null; + } +} diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java index 3b1e4bd0d3..4e90cb7003 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java @@ -84,7 +84,7 @@ protected Catalog getIcebergCatalog() { } @Override - public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) + public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context) throws TableAlreadyExistException { // convert Fluss table path to iceberg table boolean isPkTable = tableDescriptor.hasPrimaryKey(); @@ -117,7 +117,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) } @Override - public void alterTable(TablePath tablePath, List tableChanges) + public void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException { throw new UnsupportedOperationException( "Alter table is not supported for Iceberg at the moment"); diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java index 725fa4fe56..37544d9719 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java @@ -20,6 +20,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.InvalidTableException; +import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; @@ -86,7 +87,8 @@ void testPropertyPrefixRewriting() { .build(); TablePath tablePath = TablePath.of(database, tableName); - flussIcebergCatalog.createTable(tablePath, tableDescriptor); + flussIcebergCatalog.createTable( + tablePath, tableDescriptor, new TestingLakeCatalogContext()); Table created = flussIcebergCatalog @@ -118,7 +120,8 @@ void testCreatePrimaryKeyTable() { TablePath tablePath = TablePath.of(database, tableName); - flussIcebergCatalog.createTable(tablePath, tableDescriptor); + flussIcebergCatalog.createTable( + tablePath, tableDescriptor, new TestingLakeCatalogContext()); TableIdentifier tableId = TableIdentifier.of(database, tableName); Table createdTable = flussIcebergCatalog.getIcebergCatalog().loadTable(tableId); @@ -175,7 +178,8 @@ void testCreatePartitionedPrimaryKeyTable() { TablePath tablePath = TablePath.of(database, tableName); - flussIcebergCatalog.createTable(tablePath, tableDescriptor); + flussIcebergCatalog.createTable( + tablePath, tableDescriptor, new TestingLakeCatalogContext()); TableIdentifier tableId = TableIdentifier.of(database, tableName); Table createdTable = flussIcebergCatalog.getIcebergCatalog().loadTable(tableId); @@ -254,7 +258,12 @@ void rejectsPrimaryKeyTableWithMultipleBucketKeys() { TablePath tablePath = TablePath.of(database, tableName); - assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath, tableDescriptor)) + assertThatThrownBy( + () -> + flussIcebergCatalog.createTable( + tablePath, + tableDescriptor, + new TestingLakeCatalogContext())) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("Only one bucket key is supported for Iceberg"); } @@ -279,7 +288,7 @@ void testCreateLogTable() { .build(); TablePath tablePath = TablePath.of(database, tableName); - flussIcebergCatalog.createTable(tablePath, td); + flussIcebergCatalog.createTable(tablePath, td, new TestingLakeCatalogContext()); TableIdentifier tableId = TableIdentifier.of(database, tableName); Table createdTable = flussIcebergCatalog.getIcebergCatalog().loadTable(tableId); @@ -336,7 +345,7 @@ void testCreatePartitionedLogTable() { .build(); TablePath path = TablePath.of(database, tableName); - flussIcebergCatalog.createTable(path, td); + flussIcebergCatalog.createTable(path, td, new TestingLakeCatalogContext()); Table createdTable = flussIcebergCatalog @@ -401,7 +410,12 @@ void rejectsLogTableWithMultipleBucketKeys() { TablePath tablePath = TablePath.of(database, tableName); // Do not allow multiple bucket keys for log table - assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath, tableDescriptor)) + assertThatThrownBy( + () -> + flussIcebergCatalog.createTable( + tablePath, + tableDescriptor, + new TestingLakeCatalogContext())) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("Only one bucket key is supported for Iceberg"); } @@ -432,7 +446,11 @@ void testIllegalPartitionKeyType(boolean isPrimaryKeyTable) throws Exception { tableDescriptor.partitionedBy(partitionKeys); Assertions.assertThatThrownBy( - () -> flussIcebergCatalog.createTable(t1, tableDescriptor.build())) + () -> + flussIcebergCatalog.createTable( + t1, + tableDescriptor.build(), + new TestingLakeCatalogContext())) .isInstanceOf(InvalidTableException.class) .hasMessage( "Partition key only support string type for iceberg currently. Column `c1` is not string type."); diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java index 2a55fc46ab..600dcbd0d9 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java @@ -43,7 +43,7 @@ public LanceLakeCatalog(Configuration config) { } @Override - public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) { + public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context) { // currently, we don't support primary key table for lance if (tableDescriptor.hasPrimaryKey()) { throw new InvalidTableException( @@ -71,7 +71,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) { } @Override - public void alterTable(TablePath tablePath, List tableChanges) + public void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException { throw new UnsupportedOperationException( "Alter table is not supported for Lance at the moment"); diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java index b11d5adf2f..22a189208c 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java @@ -77,7 +77,7 @@ protected Catalog getPaimonCatalog() { } @Override - public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) + public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context) throws TableAlreadyExistException { // then, create the table Identifier paimonPath = toPaimon(tablePath); @@ -102,7 +102,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) } @Override - public void alterTable(TablePath tablePath, List tableChanges) + public void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException { try { Identifier paimonPath = toPaimon(tablePath); diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java index c2275fe21e..a9959d8216 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.lake.paimon; import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; @@ -62,14 +63,20 @@ void testAlterTableConfigs() throws Exception { assertThat(table.options().get("key")).isEqualTo(null); // set the value for key - flussPaimonCatalog.alterTable(tablePath, Arrays.asList(TableChange.set("key", "value"))); + flussPaimonCatalog.alterTable( + tablePath, + Arrays.asList(TableChange.set("key", "value")), + new TestingLakeCatalogContext()); table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); // we have set the value for key assertThat(table.options().get("fluss.key")).isEqualTo("value"); // reset the value for key - flussPaimonCatalog.alterTable(tablePath, Arrays.asList(TableChange.reset("key"))); + flussPaimonCatalog.alterTable( + tablePath, + Arrays.asList(TableChange.reset("key")), + new TestingLakeCatalogContext()); table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); // we have reset the value for key @@ -93,6 +100,6 @@ private void createTable(String database, String tableName) { TablePath tablePath = TablePath.of(database, tableName); - flussPaimonCatalog.createTable(tablePath, td); + flussPaimonCatalog.createTable(tablePath, td, new TestingLakeCatalogContext()); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 8fe9285bb3..d45cc02314 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -33,6 +33,7 @@ import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotPartitionedException; import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.lake.lakestorage.LakeCatalog; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.DeleteBehavior; @@ -84,6 +85,7 @@ import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.security.acl.AclBinding; import org.apache.fluss.security.acl.AclBindingFilter; +import org.apache.fluss.security.acl.FlussPrincipal; import org.apache.fluss.security.acl.OperationType; import org.apache.fluss.security.acl.Resource; import org.apache.fluss.server.DynamicConfigManager; @@ -287,7 +289,10 @@ public CompletableFuture createTable(CreateTableRequest req if (isDataLakeEnabled(tableDescriptor)) { try { checkNotNull(lakeCatalogContainer.getLakeCatalog()) - .createTable(tablePath, tableDescriptor); + .createTable( + tablePath, + tableDescriptor, + new DefaultLakeCatalogContext(currentSession().getPrincipal())); } catch (TableAlreadyExistException e) { throw new LakeTableAlreadyExistException( String.format( @@ -326,7 +331,8 @@ public CompletableFuture alterTable(AlterTableRequest reques request.isIgnoreIfNotExists(), lakeCatalogContainer.getLakeCatalog(), lakeCatalogContainer.getDataLakeFormat(), - lakeTableTieringManager); + lakeTableTieringManager, + new DefaultLakeCatalogContext(currentSession().getPrincipal())); return CompletableFuture.completedFuture(new AlterTableResponse()); } @@ -757,4 +763,18 @@ private void validateTableCreationPermission( } } } + + static class DefaultLakeCatalogContext implements LakeCatalog.Context { + + private final FlussPrincipal flussPrincipal; + + public DefaultLakeCatalogContext(FlussPrincipal flussPrincipal) { + this.flussPrincipal = flussPrincipal; + } + + @Override + public FlussPrincipal getFlussPrincipal() { + return flussPrincipal; + } + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 5b3b85f62f..2001fa7894 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -318,7 +318,8 @@ public void alterTableProperties( boolean ignoreIfNotExists, @Nullable LakeCatalog lakeCatalog, @Nullable DataLakeFormat dataLakeFormat, - LakeTableTieringManager lakeTableTieringManager) { + LakeTableTieringManager lakeTableTieringManager, + LakeCatalog.Context lakeCatalogContext) { try { // it throws TableNotExistException if the table or database not exists TableRegistration tableReg = getTableRegistration(tablePath); @@ -349,7 +350,8 @@ public void alterTableProperties( newDescriptor, tableChanges, lakeCatalog, - dataLakeFormat); + dataLakeFormat, + lakeCatalogContext); // update the table to zk TableRegistration updatedTableRegistration = tableReg.newProperties( @@ -388,7 +390,8 @@ private void preAlterTableProperties( TableDescriptor newDescriptor, List tableChanges, LakeCatalog lakeCatalog, - DataLakeFormat dataLakeFormat) { + DataLakeFormat dataLakeFormat, + LakeCatalog.Context lakeCatalogContext) { if (isDataLakeEnabled(newDescriptor)) { if (lakeCatalog == null) { throw new InvalidAlterTableException( @@ -402,7 +405,7 @@ private void preAlterTableProperties( if (!isDataLakeEnabled(tableDescriptor)) { // before create table in fluss, we may create in lake try { - lakeCatalog.createTable(tablePath, newDescriptor); + lakeCatalog.createTable(tablePath, newDescriptor, lakeCatalogContext); // no need to alter lake table if it is newly created isLakeTableNewlyCreated = true; } catch (TableAlreadyExistException e) { @@ -421,7 +424,7 @@ private void preAlterTableProperties( if (!isLakeTableNewlyCreated) { { try { - lakeCatalog.alterTable(tablePath, tableChanges); + lakeCatalog.alterTable(tablePath, tableChanges, lakeCatalogContext); } catch (TableNotExistException e) { throw new FlussRuntimeException( "Lake table doesn't exists for lake-enabled table " diff --git a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java index deb2cddb0e..c726e22e3e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java @@ -74,7 +74,8 @@ public static class TestingPaimonCatalog implements LakeCatalog { private final Map tableByPath = new HashMap<>(); @Override - public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) + public void createTable( + TablePath tablePath, TableDescriptor tableDescriptor, Context context) throws TableAlreadyExistException { if (tableByPath.containsKey(tablePath)) { throw new TableAlreadyExistException("Table " + tablePath + " already exists"); @@ -83,7 +84,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) } @Override - public void alterTable(TablePath tablePath, List tableChanges) + public void alterTable(TablePath tablePath, List tableChanges, Context context) throws TableNotExistException { // do nothing }