Skip to content

Commit 9df7b72

Browse files
authored
[server] Support alter table properties (#1625)
1 parent 0673881 commit 9df7b72

File tree

24 files changed

+891
-7
lines changed

24 files changed

+891
-7
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.fluss.exception.DatabaseAlreadyExistException;
2727
import org.apache.fluss.exception.DatabaseNotEmptyException;
2828
import org.apache.fluss.exception.DatabaseNotExistException;
29+
import org.apache.fluss.exception.InvalidAlterTableException;
2930
import org.apache.fluss.exception.InvalidDatabaseException;
3031
import org.apache.fluss.exception.InvalidPartitionException;
3132
import org.apache.fluss.exception.InvalidReplicationFactorException;
@@ -48,6 +49,7 @@
4849
import org.apache.fluss.metadata.ResolvedPartitionSpec;
4950
import org.apache.fluss.metadata.SchemaInfo;
5051
import org.apache.fluss.metadata.TableBucket;
52+
import org.apache.fluss.metadata.TableChange;
5153
import org.apache.fluss.metadata.TableDescriptor;
5254
import org.apache.fluss.metadata.TableInfo;
5355
import org.apache.fluss.metadata.TablePath;
@@ -235,6 +237,27 @@ CompletableFuture<Void> createTable(
235237
*/
236238
CompletableFuture<List<String>> listTables(String databaseName);
237239

240+
/**
241+
* Alter a table with the given {@code tableChanges}.
242+
*
243+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
244+
*
245+
* <ul>
246+
* <li>{@link DatabaseNotExistException} when the database does not exist.
247+
* <li>{@link TableNotExistException} when the table does not exist, and ignoreIfNotExists is
248+
* false.
249+
* <li>{@link InvalidAlterTableException} if the alter operation is invalid, such as alter set
250+
* a table option which is not supported to modify currently.
251+
* </ul>
252+
*
253+
* @param tablePath The table path of the table.
254+
* @param tableChanges The table changes.
255+
* @param ignoreIfNotExists if it is true, do nothing if table does not exist. If false, throw a
256+
* TableNotExistException.
257+
*/
258+
CompletableFuture<Void> alterTable(
259+
TablePath tablePath, List<TableChange> tableChanges, boolean ignoreIfNotExists);
260+
238261
/**
239262
* List all partitions in the given table in fluss cluster asynchronously.
240263
*

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.client.metadata.LakeSnapshot;
2323
import org.apache.fluss.client.metadata.MetadataUpdater;
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
25+
import org.apache.fluss.client.utils.FlussTableChangeProtoConverter;
2526
import org.apache.fluss.cluster.Cluster;
2627
import org.apache.fluss.cluster.ServerNode;
2728
import org.apache.fluss.exception.LeaderNotAvailableException;
@@ -33,6 +34,7 @@
3334
import org.apache.fluss.metadata.Schema;
3435
import org.apache.fluss.metadata.SchemaInfo;
3536
import org.apache.fluss.metadata.TableBucket;
37+
import org.apache.fluss.metadata.TableChange;
3638
import org.apache.fluss.metadata.TableDescriptor;
3739
import org.apache.fluss.metadata.TableInfo;
3840
import org.apache.fluss.metadata.TablePath;
@@ -41,6 +43,7 @@
4143
import org.apache.fluss.rpc.gateway.AdminGateway;
4244
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4345
import org.apache.fluss.rpc.gateway.TabletServerGateway;
46+
import org.apache.fluss.rpc.messages.AlterTableConfigsRequest;
4447
import org.apache.fluss.rpc.messages.CreateAclsRequest;
4548
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
4649
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -62,6 +65,7 @@
6265
import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
6366
import org.apache.fluss.rpc.messages.ListTablesRequest;
6467
import org.apache.fluss.rpc.messages.ListTablesResponse;
68+
import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
6569
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
6670
import org.apache.fluss.rpc.messages.PbPartitionSpec;
6771
import org.apache.fluss.rpc.messages.PbTablePath;
@@ -81,6 +85,7 @@
8185
import java.util.List;
8286
import java.util.Map;
8387
import java.util.concurrent.CompletableFuture;
88+
import java.util.stream.Collectors;
8489

8590
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
8691
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
@@ -235,6 +240,25 @@ public CompletableFuture<Void> createTable(
235240
return gateway.createTable(request).thenApply(r -> null);
236241
}
237242

243+
@Override
244+
public CompletableFuture<Void> alterTable(
245+
TablePath tablePath, List<TableChange> tableChanges, boolean ignoreIfNotExists) {
246+
tablePath.validate();
247+
AlterTableConfigsRequest request = new AlterTableConfigsRequest();
248+
249+
List<PbAlterConfigsRequestInfo> pbFlussTableChanges =
250+
tableChanges.stream()
251+
.map(FlussTableChangeProtoConverter::toPbAlterConfigsRequestInfo)
252+
.collect(Collectors.toList());
253+
254+
request.addAllConfigChanges(pbFlussTableChanges)
255+
.setIgnoreIfNotExists(ignoreIfNotExists)
256+
.setTablePath()
257+
.setDatabaseName(tablePath.getDatabaseName())
258+
.setTableName(tablePath.getTableName());
259+
return gateway.alterTableConfigs(request).thenApply(r -> null);
260+
}
261+
238262
@Override
239263
public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
240264
GetTableInfoRequest request = new GetTableInfoRequest();
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.utils;
19+
20+
import org.apache.fluss.metadata.AlterTableConfigsOpType;
21+
import org.apache.fluss.metadata.TableChange;
22+
import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
23+
24+
/** Convert {@link TableChange} to proto. */
25+
public class FlussTableChangeProtoConverter {
26+
27+
public static PbAlterConfigsRequestInfo toPbAlterConfigsRequestInfo(TableChange tableChange) {
28+
PbAlterConfigsRequestInfo info = new PbAlterConfigsRequestInfo();
29+
if (tableChange instanceof TableChange.SetOption) {
30+
TableChange.SetOption setOption = (TableChange.SetOption) tableChange;
31+
info.setConfigKey(setOption.getKey());
32+
info.setConfigValue(setOption.getValue());
33+
info.setOpType(AlterTableConfigsOpType.SET.value());
34+
} else if (tableChange instanceof TableChange.ResetOption) {
35+
TableChange.ResetOption resetOption = (TableChange.ResetOption) tableChange;
36+
info.setConfigKey(resetOption.getKey());
37+
info.setOpType(AlterTableConfigsOpType.DELETE.value());
38+
} else {
39+
throw new IllegalArgumentException(
40+
"Unsupported table change: " + tableChange.getClass());
41+
}
42+
return info;
43+
}
44+
}

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.fluss.metadata.Schema;
5656
import org.apache.fluss.metadata.SchemaInfo;
5757
import org.apache.fluss.metadata.TableBucket;
58+
import org.apache.fluss.metadata.TableChange;
5859
import org.apache.fluss.metadata.TableDescriptor;
5960
import org.apache.fluss.metadata.TableInfo;
6061
import org.apache.fluss.metadata.TablePath;
@@ -197,6 +198,72 @@ void testGetTableInfoAndSchema() throws Exception {
197198
.isBetween(timestampBeforeCreate, timestampAfterCreate);
198199
}
199200

201+
@Test
202+
void testAlterTable() throws Exception {
203+
// create table
204+
TablePath tablePath = TablePath.of("test_db", "alter_table_1");
205+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
206+
207+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
208+
209+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
210+
Map<String, String> updateProperties =
211+
new HashMap<>(existingTableDescriptor.getProperties());
212+
Map<String, String> updateCustomProperties =
213+
new HashMap<>(existingTableDescriptor.getCustomProperties());
214+
updateCustomProperties.put("client.connect-timeout", "240s");
215+
216+
TableDescriptor newTableDescriptor =
217+
TableDescriptor.builder()
218+
.schema(existingTableDescriptor.getSchema())
219+
.comment(existingTableDescriptor.getComment().orElse("test table"))
220+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
221+
.distributedBy(
222+
existingTableDescriptor
223+
.getTableDistribution()
224+
.get()
225+
.getBucketCount()
226+
.orElse(3),
227+
existingTableDescriptor.getBucketKeys())
228+
.properties(updateProperties)
229+
.customProperties(updateCustomProperties)
230+
.build();
231+
232+
List<TableChange> tableChanges = new ArrayList<>();
233+
TableChange tableChange = TableChange.set("client.connect-timeout", "240s");
234+
tableChanges.add(tableChange);
235+
// alter table
236+
admin.alterTable(tablePath, tableChanges, false).get();
237+
238+
TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
239+
TableDescriptor alteredTableDescriptor = alteredTableInfo.toTableDescriptor();
240+
assertThat(alteredTableDescriptor).isEqualTo(newTableDescriptor);
241+
242+
// throw exception if table not exist
243+
assertThatThrownBy(
244+
() ->
245+
admin.alterTable(
246+
TablePath.of("test_db", "alter_table_not_exist"),
247+
tableChanges,
248+
false)
249+
.get())
250+
.cause()
251+
.isInstanceOf(TableNotExistException.class);
252+
253+
// throw exception if database not exist
254+
assertThatThrownBy(
255+
() ->
256+
admin.alterTable(
257+
TablePath.of(
258+
"test_db_not_exist",
259+
"alter_table_not_exist"),
260+
tableChanges,
261+
false)
262+
.get())
263+
.cause()
264+
.isInstanceOf(DatabaseNotExistException.class);
265+
}
266+
200267
@Test
201268
void testCreateInvalidDatabaseAndTable() {
202269
assertThatThrownBy(

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import org.apache.fluss.annotation.VisibleForTesting;
2222

2323
import java.lang.reflect.Field;
24+
import java.util.Collections;
2425
import java.util.HashMap;
26+
import java.util.List;
2527
import java.util.Map;
2628

2729
/** Utilities of Fluss {@link ConfigOptions}. */
@@ -33,9 +35,12 @@ public class FlussConfigUtils {
3335
public static final String CLIENT_PREFIX = "client.";
3436
public static final String CLIENT_SECURITY_PREFIX = "client.security.";
3537

38+
public static final List<String> ALTERABLE_TABLE_CONFIG;
39+
3640
static {
3741
TABLE_OPTIONS = extractConfigOptions("table.");
3842
CLIENT_OPTIONS = extractConfigOptions("client.");
43+
ALTERABLE_TABLE_CONFIG = Collections.emptyList();
3944
}
4045

4146
@VisibleForTesting
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.exception;
19+
20+
/** Exception for invalid alter table operation. */
21+
public class InvalidAlterTableException extends ApiException {
22+
private static final long serialVersionUID = 1L;
23+
24+
public InvalidAlterTableException(String message) {
25+
this(message, null);
26+
}
27+
28+
public InvalidAlterTableException(String message, Throwable cause) {
29+
super(message, cause);
30+
}
31+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.metadata;
19+
20+
/** The operation type of altering table configurations. */
21+
public enum AlterTableConfigsOpType {
22+
SET(0),
23+
DELETE(1),
24+
APPEND(2),
25+
SUBTRACT(3);
26+
27+
public final int value;
28+
29+
AlterTableConfigsOpType(int value) {
30+
this.value = value;
31+
}
32+
33+
public static AlterTableConfigsOpType from(int opType) {
34+
switch (opType) {
35+
case 0:
36+
return SET;
37+
case 1:
38+
return DELETE;
39+
case 2:
40+
return APPEND;
41+
case 3:
42+
return SUBTRACT;
43+
default:
44+
throw new IllegalArgumentException(
45+
"Unsupported AlterTableConfigsOpType: " + opType);
46+
}
47+
}
48+
49+
public int value() {
50+
return this.value;
51+
}
52+
}

0 commit comments

Comments
 (0)