Skip to content

Commit 34bc5f7

Browse files
committed
[lake] LakeCatalog supports multi-tenancy
1 parent 23058ad commit 34bc5f7

File tree

11 files changed

+106
-26
lines changed

11 files changed

+106
-26
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.metadata.TableChange;
2424
import org.apache.fluss.metadata.TableDescriptor;
2525
import org.apache.fluss.metadata.TablePath;
26+
import org.apache.fluss.security.acl.FlussPrincipal;
2627

2728
import java.util.List;
2829

@@ -39,23 +40,40 @@ public interface LakeCatalog extends AutoCloseable {
3940
*
4041
* @param tablePath path of the table to be created
4142
* @param tableDescriptor The descriptor of the table to be created
43+
* @param context contextual information needed for create table
4244
* @throws TableAlreadyExistException if the table already exists
4345
*/
44-
void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
46+
void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context)
4547
throws TableAlreadyExistException;
4648

4749
/**
4850
* Alter a table in lake.
4951
*
5052
* @param tablePath path of the table to be altered
5153
* @param tableChanges The changes to be applied to the table
54+
* @param context contextual information needed for alter table
5255
* @throws TableNotExistException if the table not exists
5356
*/
54-
void alterTable(TablePath tablePath, List<TableChange> tableChanges)
57+
void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
5558
throws TableNotExistException;
5659

5760
@Override
5861
default void close() throws Exception {
5962
// default do nothing
6063
}
64+
65+
/**
66+
* Contextual information for lake catalog methods that modify metadata in an external data
67+
* lake. It can be used to:
68+
*
69+
* <ul>
70+
* <li>Access the fluss principal currently accessing the catalog.
71+
* </ul>
72+
*/
73+
@PublicEvolving
74+
interface Context {
75+
76+
/** Get the fluss principal currently accessing the catalog. */
77+
FlussPrincipal getFlussPrincipal();
78+
}
6179
}

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,18 +75,19 @@ private ClassLoaderFixingLakeCatalog(final LakeCatalog inner, final ClassLoader
7575
}
7676

7777
@Override
78-
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
78+
public void createTable(
79+
TablePath tablePath, TableDescriptor tableDescriptor, Context context)
7980
throws TableAlreadyExistException {
8081
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
81-
inner.createTable(tablePath, tableDescriptor);
82+
inner.createTable(tablePath, tableDescriptor, context);
8283
}
8384
}
8485

8586
@Override
86-
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
87+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
8788
throws TableNotExistException {
8889
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
89-
inner.alterTable(tablePath, tableChanges);
90+
inner.alterTable(tablePath, tableChanges, context);
9091
}
9192
}
9293

fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,12 @@ public LakeSource<?> createLakeSource(TablePath tablePath) {
146146
private static class TestPaimonLakeCatalog implements LakeCatalog {
147147

148148
@Override
149-
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
149+
public void createTable(
150+
TablePath tablePath, TableDescriptor tableDescriptor, Context context)
150151
throws TableAlreadyExistException {}
151152

152153
@Override
153-
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
154+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
154155
throws TableNotExistException {}
155156
}
156157
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.lakestorage;
19+
20+
import org.apache.fluss.security.acl.FlussPrincipal;
21+
22+
/** A testing implementation of {@link LakeCatalog.Context}. */
23+
public class TestingLakeCatalogContext implements LakeCatalog.Context {
24+
25+
@Override
26+
public FlussPrincipal getFlussPrincipal() {
27+
return null;
28+
}
29+
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ protected Catalog getIcebergCatalog() {
8484
}
8585

8686
@Override
87-
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
87+
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context)
8888
throws TableAlreadyExistException {
8989
// convert Fluss table path to iceberg table
9090
boolean isPkTable = tableDescriptor.hasPrimaryKey();
@@ -117,7 +117,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
117117
}
118118

119119
@Override
120-
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
120+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
121121
throws TableNotExistException {
122122
throw new UnsupportedOperationException(
123123
"Alter table is not supported for Iceberg at the moment");

fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public LanceLakeCatalog(Configuration config) {
4343
}
4444

4545
@Override
46-
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
46+
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context) {
4747
// currently, we don't support primary key table for lance
4848
if (tableDescriptor.hasPrimaryKey()) {
4949
throw new InvalidTableException(
@@ -71,7 +71,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
7171
}
7272

7373
@Override
74-
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
74+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
7575
throws TableNotExistException {
7676
throw new UnsupportedOperationException(
7777
"Alter table is not supported for Lance at the moment");

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected Catalog getPaimonCatalog() {
7777
}
7878

7979
@Override
80-
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
80+
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context)
8181
throws TableAlreadyExistException {
8282
// then, create the table
8383
Identifier paimonPath = toPaimon(tablePath);
@@ -102,7 +102,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
102102
}
103103

104104
@Override
105-
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
105+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
106106
throws TableNotExistException {
107107
try {
108108
Identifier paimonPath = toPaimon(tablePath);

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.lake.paimon;
1919

2020
import org.apache.fluss.config.Configuration;
21+
import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
2122
import org.apache.fluss.metadata.Schema;
2223
import org.apache.fluss.metadata.TableChange;
2324
import org.apache.fluss.metadata.TableDescriptor;
@@ -62,14 +63,20 @@ void testAlterTableConfigs() throws Exception {
6263
assertThat(table.options().get("key")).isEqualTo(null);
6364

6465
// set the value for key
65-
flussPaimonCatalog.alterTable(tablePath, Arrays.asList(TableChange.set("key", "value")));
66+
flussPaimonCatalog.alterTable(
67+
tablePath,
68+
Arrays.asList(TableChange.set("key", "value")),
69+
new TestingLakeCatalogContext());
6670

6771
table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
6872
// we have set the value for key
6973
assertThat(table.options().get("fluss.key")).isEqualTo("value");
7074

7175
// reset the value for key
72-
flussPaimonCatalog.alterTable(tablePath, Arrays.asList(TableChange.reset("key")));
76+
flussPaimonCatalog.alterTable(
77+
tablePath,
78+
Arrays.asList(TableChange.reset("key")),
79+
new TestingLakeCatalogContext());
7380

7481
table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
7582
// we have reset the value for key
@@ -93,6 +100,6 @@ private void createTable(String database, String tableName) {
93100

94101
TablePath tablePath = TablePath.of(database, tableName);
95102

96-
flussPaimonCatalog.createTable(tablePath, td);
103+
flussPaimonCatalog.createTable(tablePath, td, new TestingLakeCatalogContext());
97104
}
98105
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.exception.TableAlreadyExistException;
3434
import org.apache.fluss.exception.TableNotPartitionedException;
3535
import org.apache.fluss.fs.FileSystem;
36+
import org.apache.fluss.lake.lakestorage.LakeCatalog;
3637
import org.apache.fluss.metadata.DataLakeFormat;
3738
import org.apache.fluss.metadata.DatabaseDescriptor;
3839
import org.apache.fluss.metadata.DeleteBehavior;
@@ -84,6 +85,7 @@
8485
import org.apache.fluss.rpc.protocol.ApiError;
8586
import org.apache.fluss.security.acl.AclBinding;
8687
import org.apache.fluss.security.acl.AclBindingFilter;
88+
import org.apache.fluss.security.acl.FlussPrincipal;
8789
import org.apache.fluss.security.acl.OperationType;
8890
import org.apache.fluss.security.acl.Resource;
8991
import org.apache.fluss.server.DynamicConfigManager;
@@ -287,7 +289,10 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
287289
if (isDataLakeEnabled(tableDescriptor)) {
288290
try {
289291
checkNotNull(lakeCatalogContainer.getLakeCatalog())
290-
.createTable(tablePath, tableDescriptor);
292+
.createTable(
293+
tablePath,
294+
tableDescriptor,
295+
new DefaultLakeCatalogContext(currentSession().getPrincipal()));
291296
} catch (TableAlreadyExistException e) {
292297
throw new LakeTableAlreadyExistException(
293298
String.format(
@@ -326,7 +331,8 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
326331
request.isIgnoreIfNotExists(),
327332
lakeCatalogContainer.getLakeCatalog(),
328333
lakeCatalogContainer.getDataLakeFormat(),
329-
lakeTableTieringManager);
334+
lakeTableTieringManager,
335+
new DefaultLakeCatalogContext(currentSession().getPrincipal()));
330336

331337
return CompletableFuture.completedFuture(new AlterTableResponse());
332338
}
@@ -757,4 +763,18 @@ private void validateTableCreationPermission(
757763
}
758764
}
759765
}
766+
767+
static class DefaultLakeCatalogContext implements LakeCatalog.Context {
768+
769+
private final FlussPrincipal flussPrincipal;
770+
771+
public DefaultLakeCatalogContext(FlussPrincipal flussPrincipal) {
772+
this.flussPrincipal = flussPrincipal;
773+
}
774+
775+
@Override
776+
public FlussPrincipal getFlussPrincipal() {
777+
return flussPrincipal;
778+
}
779+
}
760780
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,8 @@ public void alterTableProperties(
318318
boolean ignoreIfNotExists,
319319
@Nullable LakeCatalog lakeCatalog,
320320
@Nullable DataLakeFormat dataLakeFormat,
321-
LakeTableTieringManager lakeTableTieringManager) {
321+
LakeTableTieringManager lakeTableTieringManager,
322+
LakeCatalog.Context lakeCatalogContext) {
322323
try {
323324
// it throws TableNotExistException if the table or database not exists
324325
TableRegistration tableReg = getTableRegistration(tablePath);
@@ -349,7 +350,8 @@ public void alterTableProperties(
349350
newDescriptor,
350351
tableChanges,
351352
lakeCatalog,
352-
dataLakeFormat);
353+
dataLakeFormat,
354+
lakeCatalogContext);
353355
// update the table to zk
354356
TableRegistration updatedTableRegistration =
355357
tableReg.newProperties(
@@ -388,7 +390,8 @@ private void preAlterTableProperties(
388390
TableDescriptor newDescriptor,
389391
List<TableChange> tableChanges,
390392
LakeCatalog lakeCatalog,
391-
DataLakeFormat dataLakeFormat) {
393+
DataLakeFormat dataLakeFormat,
394+
LakeCatalog.Context lakeCatalogContext) {
392395
if (isDataLakeEnabled(newDescriptor)) {
393396
if (lakeCatalog == null) {
394397
throw new InvalidAlterTableException(
@@ -402,7 +405,7 @@ private void preAlterTableProperties(
402405
if (!isDataLakeEnabled(tableDescriptor)) {
403406
// before create table in fluss, we may create in lake
404407
try {
405-
lakeCatalog.createTable(tablePath, newDescriptor);
408+
lakeCatalog.createTable(tablePath, newDescriptor, lakeCatalogContext);
406409
// no need to alter lake table if it is newly created
407410
isLakeTableNewlyCreated = true;
408411
} catch (TableAlreadyExistException e) {
@@ -421,7 +424,7 @@ private void preAlterTableProperties(
421424
if (!isLakeTableNewlyCreated) {
422425
{
423426
try {
424-
lakeCatalog.alterTable(tablePath, tableChanges);
427+
lakeCatalog.alterTable(tablePath, tableChanges, lakeCatalogContext);
425428
} catch (TableNotExistException e) {
426429
throw new FlussRuntimeException(
427430
"Lake table doesn't exists for lake-enabled table "

0 commit comments

Comments
 (0)