Skip to content

Commit 13999ad

Browse files
committed
[server] Persist all non-sensitive lake properties into table options
1 parent 32003fc commit 13999ad

File tree

5 files changed

+60
-56
lines changed

5 files changed

+60
-56
lines changed

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ public static boolean isTableStorageConfig(String key) {
4949
return key.startsWith(TABLE_PREFIX);
5050
}
5151

52+
public static boolean isTableLakeConfig(String dataLakeFormat, String key) {
53+
if (dataLakeFormat == null) {
54+
return false;
55+
}
56+
57+
String dataLakeConfigPrefix = TABLE_PREFIX + "datalake." + dataLakeFormat + ".";
58+
return key.startsWith(dataLakeConfigPrefix);
59+
}
60+
5261
public static boolean isAlterableTableOption(String key) {
5362
return ALTERABLE_TABLE_OPTIONS.contains(key);
5463
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,11 @@
119119

120120
import java.io.UncheckedIOException;
121121
import java.util.HashMap;
122+
import java.util.HashSet;
123+
import java.util.Iterator;
122124
import java.util.List;
123125
import java.util.Map;
126+
import java.util.Set;
124127
import java.util.concurrent.CompletableFuture;
125128
import java.util.function.Supplier;
126129
import java.util.stream.Collectors;
@@ -155,6 +158,14 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina
155158
private final LakeTableTieringManager lakeTableTieringManager;
156159
private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader;
157160

161+
public static final Set<String> SENSITIVE_TABLE_OPTIONS = new HashSet<>();
162+
163+
static {
164+
SENSITIVE_TABLE_OPTIONS.add("password");
165+
SENSITIVE_TABLE_OPTIONS.add("secret");
166+
SENSITIVE_TABLE_OPTIONS.add("key");
167+
}
168+
158169
public CoordinatorService(
159170
Configuration conf,
160171
FileSystem remoteFileSystem,
@@ -315,14 +326,11 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
315326
List<TableChange> tableChanges = toTableChanges(request.getConfigChangesList());
316327
TablePropertyChanges tablePropertyChanges = toTablePropertyChanges(tableChanges);
317328

318-
LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer =
319-
lakeCatalogDynamicLoader.getLakeCatalogContainer();
320329
metadataManager.alterTableProperties(
321330
tablePath,
322331
tableChanges,
323332
tablePropertyChanges,
324333
request.isIgnoreIfNotExists(),
325-
lakeCatalogContainer.getLakeCatalog(),
326334
lakeTableTieringManager,
327335
new DefaultLakeCatalogContext(false, currentSession().getPrincipal()));
328336

@@ -404,6 +412,18 @@ private TableDescriptor applySystemDefaults(
404412
ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
405413
}
406414

415+
// add non-sensitive lake properties to the table properties
416+
if (dataLakeEnabled) {
417+
Map<String, String> tableLakeOptions =
418+
lakeCatalogDynamicLoader.getLakeCatalogContainer().getDefaultTableLakeOptions();
419+
removeSensitiveTableOptions(tableLakeOptions);
420+
if (tableLakeOptions != null && !tableLakeOptions.isEmpty()) {
421+
Map<String, String> newProperties = new HashMap<>(newDescriptor.getProperties());
422+
newProperties.putAll(tableLakeOptions);
423+
newDescriptor = newDescriptor.withProperties(newProperties);
424+
}
425+
}
426+
407427
// For tables with first_row or versioned merge engines, automatically set to IGNORE if
408428
// delete behavior is not set
409429
Configuration tableConf = Configuration.fromMap(tableDescriptor.getProperties());
@@ -427,6 +447,20 @@ private boolean isDataLakeEnabled(TableDescriptor tableDescriptor) {
427447
return Boolean.parseBoolean(dataLakeEnabledValue);
428448
}
429449

450+
public void removeSensitiveTableOptions(Map<String, String> tableLakeOptions) {
451+
if (tableLakeOptions == null || tableLakeOptions.isEmpty()) {
452+
return;
453+
}
454+
455+
Iterator<Map.Entry<String, String>> iterator = tableLakeOptions.entrySet().iterator();
456+
while (iterator.hasNext()) {
457+
String key = iterator.next().getKey().toLowerCase();
458+
if (SENSITIVE_TABLE_OPTIONS.stream().anyMatch(key::contains)) {
459+
iterator.remove();
460+
}
461+
}
462+
}
463+
430464
@Override
431465
public CompletableFuture<DropTableResponse> dropTable(DropTableRequest request) {
432466
TablePath tablePath = toTablePath(request.getTablePath());

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@
6161

6262
import java.util.Collection;
6363
import java.util.HashMap;
64-
import java.util.HashSet;
65-
import java.util.Iterator;
6664
import java.util.List;
6765
import java.util.Map;
6866
import java.util.Optional;
@@ -82,14 +80,6 @@ public class MetadataManager {
8280
private final int maxBucketNum;
8381
private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader;
8482

85-
public static final Set<String> SENSITIVE_TABLE_OPTIOINS = new HashSet<>();
86-
87-
static {
88-
SENSITIVE_TABLE_OPTIOINS.add("password");
89-
SENSITIVE_TABLE_OPTIOINS.add("secret");
90-
SENSITIVE_TABLE_OPTIOINS.add("key");
91-
}
92-
9383
/**
9484
* Creates a new metadata manager.
9585
*
@@ -325,7 +315,6 @@ public void alterTableProperties(
325315
List<TableChange> tableChanges,
326316
TablePropertyChanges tablePropertyChanges,
327317
boolean ignoreIfNotExists,
328-
@Nullable LakeCatalog lakeCatalog,
329318
LakeTableTieringManager lakeTableTieringManager,
330319
LakeCatalog.Context lakeCatalogContext) {
331320
try {
@@ -357,7 +346,6 @@ public void alterTableProperties(
357346
tableDescriptor,
358347
newDescriptor,
359348
tableChanges,
360-
lakeCatalog,
361349
lakeCatalogContext);
362350
// update the table to zk
363351
TableRegistration updatedTableRegistration =
@@ -396,8 +384,10 @@ private void preAlterTableProperties(
396384
TableDescriptor tableDescriptor,
397385
TableDescriptor newDescriptor,
398386
List<TableChange> tableChanges,
399-
LakeCatalog lakeCatalog,
400387
LakeCatalog.Context lakeCatalogContext) {
388+
LakeCatalog lakeCatalog =
389+
lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog();
390+
401391
if (isDataLakeEnabled(newDescriptor)) {
402392
if (lakeCatalog == null) {
403393
throw new InvalidAlterTableException(
@@ -506,20 +496,6 @@ private boolean isDataLakeEnabled(Map<String, String> properties) {
506496
return Boolean.parseBoolean(dataLakeEnabledValue);
507497
}
508498

509-
public void removeSensitiveTableOptions(Map<String, String> tableLakeOptions) {
510-
if (tableLakeOptions == null || tableLakeOptions.isEmpty()) {
511-
return;
512-
}
513-
514-
Iterator<Map.Entry<String, String>> iterator = tableLakeOptions.entrySet().iterator();
515-
while (iterator.hasNext()) {
516-
String key = iterator.next().getKey().toLowerCase();
517-
if (SENSITIVE_TABLE_OPTIOINS.stream().anyMatch(key::contains)) {
518-
iterator.remove();
519-
}
520-
}
521-
}
522-
523499
public TableInfo getTable(TablePath tablePath) throws TableNotExistException {
524500
Optional<TableRegistration> optionalTable;
525501
try {
@@ -533,10 +509,7 @@ public TableInfo getTable(TablePath tablePath) throws TableNotExistException {
533509
}
534510
TableRegistration tableReg = optionalTable.get();
535511
SchemaInfo schemaInfo = getLatestSchema(tablePath);
536-
Map<String, String> tableLakeOptions =
537-
lakeCatalogDynamicLoader.getLakeCatalogContainer().getDefaultTableLakeOptions();
538-
removeSensitiveTableOptions(tableLakeOptions);
539-
return tableReg.toTableInfo(tablePath, schemaInfo, tableLakeOptions);
512+
return tableReg.toTableInfo(tablePath, schemaInfo);
540513
}
541514

542515
public Map<TablePath, TableInfo> getTables(Collection<TablePath> tablePaths)
@@ -559,14 +532,7 @@ public Map<TablePath, TableInfo> getTables(Collection<TablePath> tablePaths)
559532
TableRegistration tableReg = tablePath2TableRegistrations.get(tablePath);
560533
SchemaInfo schemaInfo = tablePath2SchemaInfos.get(tablePath);
561534

562-
result.put(
563-
tablePath,
564-
tableReg.toTableInfo(
565-
tablePath,
566-
schemaInfo,
567-
lakeCatalogDynamicLoader
568-
.getLakeCatalogContainer()
569-
.getDefaultTableLakeOptions()));
535+
result.put(tablePath, tableReg.toTableInfo(tablePath, schemaInfo));
570536
}
571537
} catch (Exception e) {
572538
throw new FlussRuntimeException(

fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
5151
import static org.apache.fluss.config.FlussConfigUtils.isAlterableTableOption;
52+
import static org.apache.fluss.config.FlussConfigUtils.isTableLakeConfig;
5253
import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
5354
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
5455
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -75,6 +76,13 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int
7576
// check properties should only contain table.* options,
7677
// and this cluster know it, and value is valid
7778
for (String key : tableConf.keySet()) {
79+
// skip datalake config
80+
if (isTableLakeConfig(
81+
tableDescriptor.getProperties().get(ConfigOptions.TABLE_DATALAKE_FORMAT.key()),
82+
key)) {
83+
continue;
84+
}
85+
7886
if (!TABLE_OPTIONS.containsKey(key)) {
7987
if (isTableStorageConfig(key)) {
8088
throw new InvalidConfigException(

fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.fluss.server.zk.data;
1919

20-
import org.apache.fluss.config.ConfigOptions;
2120
import org.apache.fluss.config.Configuration;
2221
import org.apache.fluss.config.TableConfig;
2322
import org.apache.fluss.metadata.Schema;
@@ -87,20 +86,8 @@ public TableConfig getTableConfig() {
8786
}
8887

8988
public TableInfo toTableInfo(TablePath tablePath, SchemaInfo schemaInfo) {
90-
return toTableInfo(tablePath, schemaInfo, null);
91-
}
92-
93-
public TableInfo toTableInfo(
94-
TablePath tablePath,
95-
SchemaInfo schemaInfo,
96-
@Nullable Map<String, String> defaultTableLakeOptions) {
9789
Configuration properties = Configuration.fromMap(this.properties);
98-
if (defaultTableLakeOptions != null) {
99-
if (properties.get(ConfigOptions.TABLE_DATALAKE_ENABLED)) {
100-
// only make the lake options visible when the datalake is enabled on the table
101-
defaultTableLakeOptions.forEach(properties::setString);
102-
}
103-
}
90+
10491
return new TableInfo(
10592
tablePath,
10693
this.tableId,

0 commit comments

Comments
 (0)