Skip to content

Commit 44f029b

Browse files
authored
[lake] LakeCatalog supports multi-tenancy (#1901)
1 parent 37f46dd commit 44f029b

File tree

12 files changed

+134
-34
lines changed

12 files changed

+134
-34
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.metadata.TableChange;
2424
import org.apache.fluss.metadata.TableDescriptor;
2525
import org.apache.fluss.metadata.TablePath;
26+
import org.apache.fluss.security.acl.FlussPrincipal;
2627

2728
import java.util.List;
2829

@@ -39,23 +40,42 @@ public interface LakeCatalog extends AutoCloseable {
3940
*
4041
* @param tablePath path of the table to be created
4142
* @param tableDescriptor The descriptor of the table to be created
43+
* @param context contextual information needed for create table
4244
* @throws TableAlreadyExistException if the table already exists
4345
*/
44-
void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
46+
void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context)
4547
throws TableAlreadyExistException;
4648

4749
/**
4850
* Alter a table in lake.
4951
*
5052
* @param tablePath path of the table to be altered
5153
* @param tableChanges The changes to be applied to the table
54+
* @param context contextual information needed for alter table
5255
* @throws TableNotExistException if the table not exists
5356
*/
54-
void alterTable(TablePath tablePath, List<TableChange> tableChanges)
57+
void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
5558
throws TableNotExistException;
5659

5760
@Override
5861
default void close() throws Exception {
5962
// default do nothing
6063
}
64+
65+
/**
66+
* Contextual information for lake catalog methods that modify metadata in an external data
67+
* lake. It can be used to:
68+
*
69+
* <ul>
70+
* <li>Access the fluss principal currently accessing the catalog.
71+
* </ul>
72+
*
73+
* @since 0.9
74+
*/
75+
@PublicEvolving
76+
interface Context {
77+
78+
/** Get the fluss principal currently accessing the catalog. */
79+
FlussPrincipal getFlussPrincipal();
80+
}
6181
}

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,18 +75,19 @@ private ClassLoaderFixingLakeCatalog(final LakeCatalog inner, final ClassLoader
7575
}
7676

7777
@Override
78-
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
78+
public void createTable(
79+
TablePath tablePath, TableDescriptor tableDescriptor, Context context)
7980
throws TableAlreadyExistException {
8081
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
81-
inner.createTable(tablePath, tableDescriptor);
82+
inner.createTable(tablePath, tableDescriptor, context);
8283
}
8384
}
8485

8586
@Override
86-
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
87+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
8788
throws TableNotExistException {
8889
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
89-
inner.alterTable(tablePath, tableChanges);
90+
inner.alterTable(tablePath, tableChanges, context);
9091
}
9192
}
9293

fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,12 @@ public LakeSource<?> createLakeSource(TablePath tablePath) {
146146
private static class TestPaimonLakeCatalog implements LakeCatalog {
147147

148148
@Override
149-
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
149+
public void createTable(
150+
TablePath tablePath, TableDescriptor tableDescriptor, Context context)
150151
throws TableAlreadyExistException {}
151152

152153
@Override
153-
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
154+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
154155
throws TableNotExistException {}
155156
}
156157
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.lakestorage;
19+
20+
import org.apache.fluss.security.acl.FlussPrincipal;
21+
22+
/** A testing implementation of {@link LakeCatalog.Context}. */
23+
public class TestingLakeCatalogContext implements LakeCatalog.Context {
24+
25+
@Override
26+
public FlussPrincipal getFlussPrincipal() {
27+
return null;
28+
}
29+
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ protected Catalog getIcebergCatalog() {
8484
}
8585

8686
@Override
87-
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
87+
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context)
8888
throws TableAlreadyExistException {
8989
// convert Fluss table path to iceberg table
9090
boolean isPkTable = tableDescriptor.hasPrimaryKey();
@@ -117,7 +117,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
117117
}
118118

119119
@Override
120-
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
120+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
121121
throws TableNotExistException {
122122
throw new UnsupportedOperationException(
123123
"Alter table is not supported for Iceberg at the moment");

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.config.ConfigOptions;
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.exception.InvalidTableException;
23+
import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
2324
import org.apache.fluss.metadata.Schema;
2425
import org.apache.fluss.metadata.TableDescriptor;
2526
import org.apache.fluss.metadata.TablePath;
@@ -86,7 +87,8 @@ void testPropertyPrefixRewriting() {
8687
.build();
8788

8889
TablePath tablePath = TablePath.of(database, tableName);
89-
flussIcebergCatalog.createTable(tablePath, tableDescriptor);
90+
flussIcebergCatalog.createTable(
91+
tablePath, tableDescriptor, new TestingLakeCatalogContext());
9092

9193
Table created =
9294
flussIcebergCatalog
@@ -118,7 +120,8 @@ void testCreatePrimaryKeyTable() {
118120

119121
TablePath tablePath = TablePath.of(database, tableName);
120122

121-
flussIcebergCatalog.createTable(tablePath, tableDescriptor);
123+
flussIcebergCatalog.createTable(
124+
tablePath, tableDescriptor, new TestingLakeCatalogContext());
122125

123126
TableIdentifier tableId = TableIdentifier.of(database, tableName);
124127
Table createdTable = flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
@@ -175,7 +178,8 @@ void testCreatePartitionedPrimaryKeyTable() {
175178

176179
TablePath tablePath = TablePath.of(database, tableName);
177180

178-
flussIcebergCatalog.createTable(tablePath, tableDescriptor);
181+
flussIcebergCatalog.createTable(
182+
tablePath, tableDescriptor, new TestingLakeCatalogContext());
179183

180184
TableIdentifier tableId = TableIdentifier.of(database, tableName);
181185
Table createdTable = flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
@@ -254,7 +258,12 @@ void rejectsPrimaryKeyTableWithMultipleBucketKeys() {
254258

255259
TablePath tablePath = TablePath.of(database, tableName);
256260

257-
assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath, tableDescriptor))
261+
assertThatThrownBy(
262+
() ->
263+
flussIcebergCatalog.createTable(
264+
tablePath,
265+
tableDescriptor,
266+
new TestingLakeCatalogContext()))
258267
.isInstanceOf(UnsupportedOperationException.class)
259268
.hasMessageContaining("Only one bucket key is supported for Iceberg");
260269
}
@@ -279,7 +288,7 @@ void testCreateLogTable() {
279288
.build();
280289

281290
TablePath tablePath = TablePath.of(database, tableName);
282-
flussIcebergCatalog.createTable(tablePath, td);
291+
flussIcebergCatalog.createTable(tablePath, td, new TestingLakeCatalogContext());
283292

284293
TableIdentifier tableId = TableIdentifier.of(database, tableName);
285294
Table createdTable = flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
@@ -336,7 +345,7 @@ void testCreatePartitionedLogTable() {
336345
.build();
337346

338347
TablePath path = TablePath.of(database, tableName);
339-
flussIcebergCatalog.createTable(path, td);
348+
flussIcebergCatalog.createTable(path, td, new TestingLakeCatalogContext());
340349

341350
Table createdTable =
342351
flussIcebergCatalog
@@ -401,7 +410,12 @@ void rejectsLogTableWithMultipleBucketKeys() {
401410
TablePath tablePath = TablePath.of(database, tableName);
402411

403412
// Do not allow multiple bucket keys for log table
404-
assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath, tableDescriptor))
413+
assertThatThrownBy(
414+
() ->
415+
flussIcebergCatalog.createTable(
416+
tablePath,
417+
tableDescriptor,
418+
new TestingLakeCatalogContext()))
405419
.isInstanceOf(UnsupportedOperationException.class)
406420
.hasMessageContaining("Only one bucket key is supported for Iceberg");
407421
}
@@ -432,7 +446,11 @@ void testIllegalPartitionKeyType(boolean isPrimaryKeyTable) throws Exception {
432446
tableDescriptor.partitionedBy(partitionKeys);
433447

434448
Assertions.assertThatThrownBy(
435-
() -> flussIcebergCatalog.createTable(t1, tableDescriptor.build()))
449+
() ->
450+
flussIcebergCatalog.createTable(
451+
t1,
452+
tableDescriptor.build(),
453+
new TestingLakeCatalogContext()))
436454
.isInstanceOf(InvalidTableException.class)
437455
.hasMessage(
438456
"Partition key only support string type for iceberg currently. Column `c1` is not string type.");

fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public LanceLakeCatalog(Configuration config) {
4343
}
4444

4545
@Override
46-
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
46+
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context) {
4747
// currently, we don't support primary key table for lance
4848
if (tableDescriptor.hasPrimaryKey()) {
4949
throw new InvalidTableException(
@@ -71,7 +71,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) {
7171
}
7272

7373
@Override
74-
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
74+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
7575
throws TableNotExistException {
7676
throw new UnsupportedOperationException(
7777
"Alter table is not supported for Lance at the moment");

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected Catalog getPaimonCatalog() {
7777
}
7878

7979
@Override
80-
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
80+
public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Context context)
8181
throws TableAlreadyExistException {
8282
// then, create the table
8383
Identifier paimonPath = toPaimon(tablePath);
@@ -102,7 +102,7 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
102102
}
103103

104104
@Override
105-
public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
105+
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
106106
throws TableNotExistException {
107107
try {
108108
Identifier paimonPath = toPaimon(tablePath);

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.lake.paimon;
1919

2020
import org.apache.fluss.config.Configuration;
21+
import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
2122
import org.apache.fluss.metadata.Schema;
2223
import org.apache.fluss.metadata.TableChange;
2324
import org.apache.fluss.metadata.TableDescriptor;
@@ -62,14 +63,20 @@ void testAlterTableConfigs() throws Exception {
6263
assertThat(table.options().get("key")).isEqualTo(null);
6364

6465
// set the value for key
65-
flussPaimonCatalog.alterTable(tablePath, Arrays.asList(TableChange.set("key", "value")));
66+
flussPaimonCatalog.alterTable(
67+
tablePath,
68+
Arrays.asList(TableChange.set("key", "value")),
69+
new TestingLakeCatalogContext());
6670

6771
table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
6872
// we have set the value for key
6973
assertThat(table.options().get("fluss.key")).isEqualTo("value");
7074

7175
// reset the value for key
72-
flussPaimonCatalog.alterTable(tablePath, Arrays.asList(TableChange.reset("key")));
76+
flussPaimonCatalog.alterTable(
77+
tablePath,
78+
Arrays.asList(TableChange.reset("key")),
79+
new TestingLakeCatalogContext());
7380

7481
table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
7582
// we have reset the value for key
@@ -93,6 +100,6 @@ private void createTable(String database, String tableName) {
93100

94101
TablePath tablePath = TablePath.of(database, tableName);
95102

96-
flussPaimonCatalog.createTable(tablePath, td);
103+
flussPaimonCatalog.createTable(tablePath, td, new TestingLakeCatalogContext());
97104
}
98105
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.exception.TableAlreadyExistException;
3434
import org.apache.fluss.exception.TableNotPartitionedException;
3535
import org.apache.fluss.fs.FileSystem;
36+
import org.apache.fluss.lake.lakestorage.LakeCatalog;
3637
import org.apache.fluss.metadata.DataLakeFormat;
3738
import org.apache.fluss.metadata.DatabaseDescriptor;
3839
import org.apache.fluss.metadata.DeleteBehavior;
@@ -84,6 +85,7 @@
8485
import org.apache.fluss.rpc.protocol.ApiError;
8586
import org.apache.fluss.security.acl.AclBinding;
8687
import org.apache.fluss.security.acl.AclBindingFilter;
88+
import org.apache.fluss.security.acl.FlussPrincipal;
8789
import org.apache.fluss.security.acl.OperationType;
8890
import org.apache.fluss.security.acl.Resource;
8991
import org.apache.fluss.server.DynamicConfigManager;
@@ -287,7 +289,10 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
287289
if (isDataLakeEnabled(tableDescriptor)) {
288290
try {
289291
checkNotNull(lakeCatalogContainer.getLakeCatalog())
290-
.createTable(tablePath, tableDescriptor);
292+
.createTable(
293+
tablePath,
294+
tableDescriptor,
295+
new DefaultLakeCatalogContext(currentSession().getPrincipal()));
291296
} catch (TableAlreadyExistException e) {
292297
throw new LakeTableAlreadyExistException(
293298
String.format(
@@ -326,7 +331,8 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
326331
request.isIgnoreIfNotExists(),
327332
lakeCatalogContainer.getLakeCatalog(),
328333
lakeCatalogContainer.getDataLakeFormat(),
329-
lakeTableTieringManager);
334+
lakeTableTieringManager,
335+
new DefaultLakeCatalogContext(currentSession().getPrincipal()));
330336

331337
return CompletableFuture.completedFuture(new AlterTableResponse());
332338
}
@@ -757,4 +763,18 @@ private void validateTableCreationPermission(
757763
}
758764
}
759765
}
766+
767+
static class DefaultLakeCatalogContext implements LakeCatalog.Context {
768+
769+
private final FlussPrincipal flussPrincipal;
770+
771+
public DefaultLakeCatalogContext(FlussPrincipal flussPrincipal) {
772+
this.flussPrincipal = flussPrincipal;
773+
}
774+
775+
@Override
776+
public FlussPrincipal getFlussPrincipal() {
777+
return flussPrincipal;
778+
}
779+
}
760780
}

0 commit comments

Comments
 (0)