Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.security.acl.FlussPrincipal;

import java.util.List;

Expand All @@ -39,23 +40,42 @@ public interface LakeCatalog extends AutoCloseable {
*
* @param tablePath path of the table to be created
* @param tableDescriptor The descriptor of the table to be created
* @param context contextual information needed for create table
* @throws TableAlreadyExistException if the table already exists
*/
void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context)
throws TableAlreadyExistException;

/**
* Alter a table in lake.
*
* @param tablePath path of the table to be altered
* @param tableChanges The changes to be applied to the table
* @param context contextual information needed for alter table
* @throws TableNotExistException if the table not exists
*/
void alterTable(TablePath tablePath, List<TableChange> tableChanges)
void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
throws TableNotExistException;

@Override
default void close() throws Exception {
// default do nothing
}

/**
* Contextual information for lake catalog methods that modify metadata in an external data
* lake. It can be used to:
*
* <ul>
* <li>Access the fluss principal currently accessing the catalog.
* </ul>
*
* @since 0.9
*/
@PublicEvolving
interface Context {

/** Get the fluss principal currently accessing the catalog. */
FlussPrincipal getFlussPrincipal();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,19 @@ private ClassLoaderFixingLakeCatalog(final LakeCatalog inner, final ClassLoader
}

@Override
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
public void createTable(
TablePath tablePath, TableDescriptor tableDescriptor, Context context)
throws TableAlreadyExistException {
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
inner.createTable(tablePath, tableDescriptor);
inner.createTable(tablePath, tableDescriptor, context);
}
}

@Override
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
throws TableNotExistException {
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
inner.alterTable(tablePath, tableChanges);
inner.alterTable(tablePath, tableChanges, context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,12 @@ public LakeSource<?> createLakeSource(TablePath tablePath) {
private static class TestPaimonLakeCatalog implements LakeCatalog {

@Override
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
public void createTable(
TablePath tablePath, TableDescriptor tableDescriptor, Context context)
throws TableAlreadyExistException {}

@Override
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
throws TableNotExistException {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.lake.lakestorage;

import org.apache.fluss.security.acl.FlussPrincipal;

/** A testing implementation of {@link LakeCatalog.Context}. */
public class TestingLakeCatalogContext implements LakeCatalog.Context {

@Override
public FlussPrincipal getFlussPrincipal() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected Catalog getIcebergCatalog() {
}

@Override
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context)
throws TableAlreadyExistException {
// convert Fluss table path to iceberg table
boolean isPkTable = tableDescriptor.hasPrimaryKey();
Expand Down Expand Up @@ -117,7 +117,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
}

@Override
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
throws TableNotExistException {
throw new UnsupportedOperationException(
"Alter table is not supported for Iceberg at the moment");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
Expand Down Expand Up @@ -86,7 +87,8 @@ void testPropertyPrefixRewriting() {
.build();

TablePath tablePath = TablePath.of(database, tableName);
flussIcebergCatalog.createTable(tablePath, tableDescriptor);
flussIcebergCatalog.createTable(
tablePath, tableDescriptor, new TestingLakeCatalogContext());

Table created =
flussIcebergCatalog
Expand Down Expand Up @@ -118,7 +120,8 @@ void testCreatePrimaryKeyTable() {

TablePath tablePath = TablePath.of(database, tableName);

flussIcebergCatalog.createTable(tablePath, tableDescriptor);
flussIcebergCatalog.createTable(
tablePath, tableDescriptor, new TestingLakeCatalogContext());

TableIdentifier tableId = TableIdentifier.of(database, tableName);
Table createdTable = flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
Expand Down Expand Up @@ -175,7 +178,8 @@ void testCreatePartitionedPrimaryKeyTable() {

TablePath tablePath = TablePath.of(database, tableName);

flussIcebergCatalog.createTable(tablePath, tableDescriptor);
flussIcebergCatalog.createTable(
tablePath, tableDescriptor, new TestingLakeCatalogContext());

TableIdentifier tableId = TableIdentifier.of(database, tableName);
Table createdTable = flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
Expand Down Expand Up @@ -254,7 +258,12 @@ void rejectsPrimaryKeyTableWithMultipleBucketKeys() {

TablePath tablePath = TablePath.of(database, tableName);

assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath, tableDescriptor))
assertThatThrownBy(
() ->
flussIcebergCatalog.createTable(
tablePath,
tableDescriptor,
new TestingLakeCatalogContext()))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Only one bucket key is supported for Iceberg");
}
Expand All @@ -279,7 +288,7 @@ void testCreateLogTable() {
.build();

TablePath tablePath = TablePath.of(database, tableName);
flussIcebergCatalog.createTable(tablePath, td);
flussIcebergCatalog.createTable(tablePath, td, new TestingLakeCatalogContext());

TableIdentifier tableId = TableIdentifier.of(database, tableName);
Table createdTable = flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
Expand Down Expand Up @@ -336,7 +345,7 @@ void testCreatePartitionedLogTable() {
.build();

TablePath path = TablePath.of(database, tableName);
flussIcebergCatalog.createTable(path, td);
flussIcebergCatalog.createTable(path, td, new TestingLakeCatalogContext());

Table createdTable =
flussIcebergCatalog
Expand Down Expand Up @@ -401,7 +410,12 @@ void rejectsLogTableWithMultipleBucketKeys() {
TablePath tablePath = TablePath.of(database, tableName);

// Do not allow multiple bucket keys for log table
assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath, tableDescriptor))
assertThatThrownBy(
() ->
flussIcebergCatalog.createTable(
tablePath,
tableDescriptor,
new TestingLakeCatalogContext()))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Only one bucket key is supported for Iceberg");
}
Expand Down Expand Up @@ -432,7 +446,11 @@ void testIllegalPartitionKeyType(boolean isPrimaryKeyTable) throws Exception {
tableDescriptor.partitionedBy(partitionKeys);

Assertions.assertThatThrownBy(
() -> flussIcebergCatalog.createTable(t1, tableDescriptor.build()))
() ->
flussIcebergCatalog.createTable(
t1,
tableDescriptor.build(),
new TestingLakeCatalogContext()))
.isInstanceOf(InvalidTableException.class)
.hasMessage(
"Partition key only support string type for iceberg currently. Column `c1` is not string type.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public LanceLakeCatalog(Configuration config) {
}

@Override
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context) {
// currently, we don't support primary key table for lance
if (tableDescriptor.hasPrimaryKey()) {
throw new InvalidTableException(
Expand Down Expand Up @@ -71,7 +71,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
}

@Override
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
throws TableNotExistException {
throw new UnsupportedOperationException(
"Alter table is not supported for Lance at the moment");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected Catalog getPaimonCatalog() {
}

@Override
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context)
throws TableAlreadyExistException {
// then, create the table
Identifier paimonPath = toPaimon(tablePath);
Expand All @@ -102,7 +102,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
}

@Override
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
throws TableNotExistException {
try {
Identifier paimonPath = toPaimon(tablePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.lake.paimon;

import org.apache.fluss.config.Configuration;
import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -62,14 +63,20 @@ void testAlterTableConfigs() throws Exception {
assertThat(table.options().get("key")).isEqualTo(null);

// set the value for key
flussPaimonCatalog.alterTable(tablePath, Arrays.asList(TableChange.set("key", "value")));
flussPaimonCatalog.alterTable(
tablePath,
Arrays.asList(TableChange.set("key", "value")),
new TestingLakeCatalogContext());

table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
// we have set the value for key
assertThat(table.options().get("fluss.key")).isEqualTo("value");

// reset the value for key
flussPaimonCatalog.alterTable(tablePath, Arrays.asList(TableChange.reset("key")));
flussPaimonCatalog.alterTable(
tablePath,
Arrays.asList(TableChange.reset("key")),
new TestingLakeCatalogContext());

table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
// we have reset the value for key
Expand All @@ -93,6 +100,6 @@ private void createTable(String database, String tableName) {

TablePath tablePath = TablePath.of(database, tableName);

flussPaimonCatalog.createTable(tablePath, td);
flussPaimonCatalog.createTable(tablePath, td, new TestingLakeCatalogContext());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DeleteBehavior;
Expand Down Expand Up @@ -84,6 +85,7 @@
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.security.acl.AclBinding;
import org.apache.fluss.security.acl.AclBindingFilter;
import org.apache.fluss.security.acl.FlussPrincipal;
import org.apache.fluss.security.acl.OperationType;
import org.apache.fluss.security.acl.Resource;
import org.apache.fluss.server.DynamicConfigManager;
Expand Down Expand Up @@ -287,7 +289,10 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
if (isDataLakeEnabled(tableDescriptor)) {
try {
checkNotNull(lakeCatalogContainer.getLakeCatalog())
.createTable(tablePath, tableDescriptor);
.createTable(
tablePath,
tableDescriptor,
new DefaultLakeCatalogContext(currentSession().getPrincipal()));
} catch (TableAlreadyExistException e) {
throw new LakeTableAlreadyExistException(
String.format(
Expand Down Expand Up @@ -326,7 +331,8 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
request.isIgnoreIfNotExists(),
lakeCatalogContainer.getLakeCatalog(),
lakeCatalogContainer.getDataLakeFormat(),
lakeTableTieringManager);
lakeTableTieringManager,
new DefaultLakeCatalogContext(currentSession().getPrincipal()));

return CompletableFuture.completedFuture(new AlterTableResponse());
}
Expand Down Expand Up @@ -757,4 +763,18 @@ private void validateTableCreationPermission(
}
}
}

static class DefaultLakeCatalogContext implements LakeCatalog.Context {

private final FlussPrincipal flussPrincipal;

public DefaultLakeCatalogContext(FlussPrincipal flussPrincipal) {
this.flussPrincipal = flussPrincipal;
}

@Override
public FlussPrincipal getFlussPrincipal() {
return flussPrincipal;
}
}
}
Loading