Skip to content

Commit 39ffcfb

Browse files
committed
[server] Not permitted to enable datalake for tables created before the cluster enable datalake
1 parent 9c412c9 commit 39ffcfb

File tree

2 files changed

+191
-1
lines changed

2 files changed

+191
-1
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table;
19+
20+
import org.apache.fluss.client.admin.ClientToServerITCaseBase;
21+
import org.apache.fluss.config.ConfigOptions;
22+
import org.apache.fluss.config.cluster.AlterConfig;
23+
import org.apache.fluss.config.cluster.AlterConfigOpType;
24+
import org.apache.fluss.exception.InvalidAlterTableException;
25+
import org.apache.fluss.metadata.DataLakeFormat;
26+
import org.apache.fluss.metadata.DatabaseDescriptor;
27+
import org.apache.fluss.metadata.Schema;
28+
import org.apache.fluss.metadata.TableChange;
29+
import org.apache.fluss.metadata.TableDescriptor;
30+
import org.apache.fluss.metadata.TableInfo;
31+
import org.apache.fluss.metadata.TablePath;
32+
import org.apache.fluss.types.DataTypes;
33+
34+
import org.junit.jupiter.api.Test;
35+
36+
import java.util.Collections;
37+
import java.util.List;
38+
39+
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
40+
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
41+
import static org.assertj.core.api.Assertions.assertThat;
42+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
43+
44+
/** IT case for lake enable table. */
45+
class LakeEnableTableITCase extends ClientToServerITCaseBase {
46+
47+
@Test
48+
void testCannotEnableDatalakeForTableCreatedBeforeClusterEnabledDatalake() throws Exception {
49+
String databaseName = "test_db";
50+
String tableName = "test_table_before_datalake";
51+
TablePath tablePath = TablePath.of(databaseName, tableName);
52+
53+
// Disable datalake format for the cluster
54+
admin.alterClusterConfigs(
55+
Collections.singletonList(
56+
new AlterConfig(
57+
DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET)))
58+
.get();
59+
// Verify cluster now has no datalake format enabled
60+
assertThat(
61+
FLUSS_CLUSTER_EXTENSION
62+
.getCoordinatorServer()
63+
.getCoordinatorService()
64+
.getDataLakeFormat())
65+
.isEqualTo(null);
66+
67+
// Create database
68+
admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get();
69+
70+
// Create table before cluster enables datalake format
71+
TableDescriptor tableDescriptor =
72+
TableDescriptor.builder()
73+
.schema(
74+
Schema.newBuilder()
75+
.column("c1", DataTypes.INT())
76+
.column("c2", DataTypes.STRING())
77+
.build())
78+
.distributedBy(3, "c1")
79+
.build();
80+
admin.createTable(tablePath, tableDescriptor, false).get();
81+
82+
// Verify table was created without datalake format
83+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
84+
assertThat(tableInfo.getTableConfig().getDataLakeFormat().isPresent()).isFalse();
85+
assertThat(tableInfo.getTableConfig().isDataLakeEnabled()).isFalse();
86+
87+
// Enable datalake format for the cluster
88+
admin.alterClusterConfigs(
89+
Collections.singletonList(
90+
new AlterConfig(
91+
DATALAKE_FORMAT.key(),
92+
DataLakeFormat.PAIMON.toString(),
93+
AlterConfigOpType.SET)))
94+
.get();
95+
// Verify cluster now has datalake format enabled
96+
assertThat(
97+
FLUSS_CLUSTER_EXTENSION
98+
.getCoordinatorServer()
99+
.getCoordinatorService()
100+
.getDataLakeFormat())
101+
.isEqualTo(DataLakeFormat.PAIMON);
102+
103+
// Try to enable datalake for the table created before cluster enabled datalake
104+
// This should fail with InvalidAlterTableException
105+
List<TableChange> enableDatalakeChange =
106+
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true"));
107+
assertThatThrownBy(() -> admin.alterTable(tablePath, enableDatalakeChange, false).get())
108+
.cause()
109+
.isInstanceOf(InvalidAlterTableException.class)
110+
.hasMessageContaining(
111+
"The option 'table.datalake.enabled' cannot be altered for tables that were"
112+
+ " created before the Fluss cluster enabled datalake.");
113+
}
114+
115+
@Test
116+
void testTableWithExplicitDatalakeFormatCanEnableDatalake() throws Exception {
117+
String databaseName = "test_db";
118+
String tableName = "test_table_explicit_format";
119+
TablePath tablePath = TablePath.of(databaseName, tableName);
120+
121+
// Create database
122+
admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get();
123+
124+
// Disable datalake format for the cluster
125+
admin.alterClusterConfigs(
126+
Collections.singletonList(
127+
new AlterConfig(
128+
DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET)))
129+
.get();
130+
// Verify cluster now has no datalake format enabled
131+
assertThat(
132+
FLUSS_CLUSTER_EXTENSION
133+
.getCoordinatorServer()
134+
.getCoordinatorService()
135+
.getDataLakeFormat())
136+
.isEqualTo(null);
137+
138+
// Create table with explicit datalake format set (even though cluster has it)
139+
TableDescriptor tableDescriptor =
140+
TableDescriptor.builder()
141+
.schema(
142+
Schema.newBuilder()
143+
.column("c1", DataTypes.INT())
144+
.column("c2", DataTypes.STRING())
145+
.build())
146+
.distributedBy(3, "c1")
147+
.property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.PAIMON)
148+
.build();
149+
admin.createTable(tablePath, tableDescriptor, false).get();
150+
// Verify table has datalake format
151+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
152+
assertThat(tableInfo.getTableConfig().getDataLakeFormat().isPresent()).isTrue();
153+
assertThat(tableInfo.getTableConfig().getDataLakeFormat().get())
154+
.isEqualTo(DataLakeFormat.PAIMON);
155+
156+
// Enable datalake format for the cluster
157+
admin.alterClusterConfigs(
158+
Collections.singletonList(
159+
new AlterConfig(
160+
DATALAKE_FORMAT.key(),
161+
DataLakeFormat.PAIMON.toString(),
162+
AlterConfigOpType.SET)))
163+
.get();
164+
// Verify cluster now has datalake format enabled
165+
assertThat(
166+
FLUSS_CLUSTER_EXTENSION
167+
.getCoordinatorServer()
168+
.getCoordinatorService()
169+
.getDataLakeFormat())
170+
.isEqualTo(DataLakeFormat.PAIMON);
171+
172+
// Enable datalake for the table - this should succeed although the table was created
173+
// before cluster enabled datalake, because the table has explicit datalake format set
174+
List<TableChange> enableDatalakeChange =
175+
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true"));
176+
admin.alterTable(tablePath, enableDatalakeChange, false).get();
177+
// Verify datalake is now enabled
178+
TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get();
179+
assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue();
180+
}
181+
}

fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,24 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int
113113

114114
public static void validateAlterTableProperties(
115115
TableInfo currentTable, Set<String> tableKeysToChange, Set<String> customKeysToChange) {
116+
TableConfig currentConfig = currentTable.getTableConfig();
116117
tableKeysToChange.forEach(
117118
k -> {
118119
if (isTableStorageConfig(k) && !isAlterableTableOption(k)) {
119120
throw new InvalidAlterTableException(
120121
"The option '" + k + "' is not supported to alter yet.");
121122
}
123+
124+
if (!currentConfig.getDataLakeFormat().isPresent()
125+
&& ConfigOptions.TABLE_DATALAKE_ENABLED.key().equals(k)) {
126+
throw new InvalidAlterTableException(
127+
String.format(
128+
"The option '%s' cannot be altered for tables that were"
129+
+ " created before the Fluss cluster enabled datalake.",
130+
ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
131+
}
122132
});
123133

124-
TableConfig currentConfig = currentTable.getTableConfig();
125134
if (currentConfig.isDataLakeEnabled() && currentConfig.getDataLakeFormat().isPresent()) {
126135
String format = currentConfig.getDataLakeFormat().get().toString();
127136
customKeysToChange.forEach(

0 commit comments

Comments
 (0)